Merge "Make the runner a runtime provided property"
This commit is contained in:
@@ -18,7 +18,6 @@ import threading
|
|||||||
|
|
||||||
from taskflow.engines.action_engine import compiler
|
from taskflow.engines.action_engine import compiler
|
||||||
from taskflow.engines.action_engine import executor
|
from taskflow.engines.action_engine import executor
|
||||||
from taskflow.engines.action_engine import runner
|
|
||||||
from taskflow.engines.action_engine import runtime
|
from taskflow.engines.action_engine import runtime
|
||||||
from taskflow.engines import base
|
from taskflow.engines import base
|
||||||
|
|
||||||
@@ -53,7 +52,6 @@ class ActionEngine(base.EngineBase):
|
|||||||
|
|
||||||
def __init__(self, flow, flow_detail, backend, conf):
|
def __init__(self, flow, flow_detail, backend, conf):
|
||||||
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf)
|
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf)
|
||||||
self._runner = None
|
|
||||||
self._runtime = None
|
self._runtime = None
|
||||||
self._compiled = False
|
self._compiled = False
|
||||||
self._compilation = None
|
self._compilation = None
|
||||||
@@ -116,9 +114,10 @@ class ActionEngine(base.EngineBase):
|
|||||||
self.prepare()
|
self.prepare()
|
||||||
self._task_executor.start()
|
self._task_executor.start()
|
||||||
state = None
|
state = None
|
||||||
|
runner = self._runtime.runner
|
||||||
try:
|
try:
|
||||||
self._change_state(states.RUNNING)
|
self._change_state(states.RUNNING)
|
||||||
for state in self._runner.run_iter(timeout=timeout):
|
for state in runner.run_iter(timeout=timeout):
|
||||||
try:
|
try:
|
||||||
try_suspend = yield state
|
try_suspend = yield state
|
||||||
except GeneratorExit:
|
except GeneratorExit:
|
||||||
@@ -130,7 +129,7 @@ class ActionEngine(base.EngineBase):
|
|||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
self._change_state(states.FAILURE)
|
self._change_state(states.FAILURE)
|
||||||
else:
|
else:
|
||||||
ignorable_states = getattr(self._runner, 'ignorable_states', [])
|
ignorable_states = getattr(runner, 'ignorable_states', [])
|
||||||
if state and state not in ignorable_states:
|
if state and state not in ignorable_states:
|
||||||
self._change_state(state)
|
self._change_state(state)
|
||||||
if state != states.SUSPENDED and state != states.SUCCESS:
|
if state != states.SUSPENDED and state != states.SUCCESS:
|
||||||
@@ -214,7 +213,6 @@ class ActionEngine(base.EngineBase):
|
|||||||
self.storage,
|
self.storage,
|
||||||
self.task_notifier,
|
self.task_notifier,
|
||||||
self._task_executor)
|
self._task_executor)
|
||||||
self._runner = runner.Runner(self._runtime, self._task_executor)
|
|
||||||
self._compiled = True
|
self._compiled = True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ from taskflow.utils import misc
|
|||||||
from taskflow.engines.action_engine import analyzer as ca
|
from taskflow.engines.action_engine import analyzer as ca
|
||||||
from taskflow.engines.action_engine import executor as ex
|
from taskflow.engines.action_engine import executor as ex
|
||||||
from taskflow.engines.action_engine import retry_action as ra
|
from taskflow.engines.action_engine import retry_action as ra
|
||||||
|
from taskflow.engines.action_engine import runner as ru
|
||||||
from taskflow.engines.action_engine import task_action as ta
|
from taskflow.engines.action_engine import task_action as ta
|
||||||
|
|
||||||
|
|
||||||
@@ -50,6 +51,10 @@ class Runtime(object):
|
|||||||
def analyzer(self):
|
def analyzer(self):
|
||||||
return ca.Analyzer(self._compilation, self._storage)
|
return ca.Analyzer(self._compilation, self._storage)
|
||||||
|
|
||||||
|
@misc.cachedproperty
|
||||||
|
def runner(self):
|
||||||
|
return ru.Runner(self, self._task_executor)
|
||||||
|
|
||||||
@misc.cachedproperty
|
@misc.cachedproperty
|
||||||
def completer(self):
|
def completer(self):
|
||||||
return Completer(self)
|
return Completer(self)
|
||||||
|
|||||||
Reference in New Issue
Block a user