Stop returning atoms from execute/revert methods
It is not needed to return the atom that was executed from the futures result() method, since we can just as easily set an attribute on the future and reference it from there when using it later. This is also required for a process based executor since it is not typically possible to send back a raw task object (and is not desireable to require this); even if it was possible the task would be pickled and unpickled multiple times so when this happens it can not be guaranteed to even be the same object (in fact it is not). Part of blueprint process-executor Change-Id: I4a05ea5dcdef97218312e3a88ed4a1dfdf1b1edf
This commit is contained in:
committed by
Joshua Harlow
parent
dc4262e587
commit
5f0b514a14
@@ -76,7 +76,7 @@ class RetryAction(object):
|
||||
result = retry.execute(**kwargs)
|
||||
except Exception:
|
||||
result = failure.Failure()
|
||||
return (retry, ex.EXECUTED, result)
|
||||
return (ex.EXECUTED, result)
|
||||
|
||||
def _on_done_callback(fut):
|
||||
result = fut.result()[-1]
|
||||
@@ -89,6 +89,7 @@ class RetryAction(object):
|
||||
fut = self._executor.submit(_execute_retry,
|
||||
self._get_retry_args(retry))
|
||||
fut.add_done_callback(_on_done_callback)
|
||||
fut.atom = retry
|
||||
return fut
|
||||
|
||||
def revert(self, retry):
|
||||
@@ -98,7 +99,7 @@ class RetryAction(object):
|
||||
result = retry.revert(**kwargs)
|
||||
except Exception:
|
||||
result = failure.Failure()
|
||||
return (retry, ex.REVERTED, result)
|
||||
return (ex.REVERTED, result)
|
||||
|
||||
def _on_done_callback(fut):
|
||||
result = fut.result()[-1]
|
||||
@@ -115,6 +116,7 @@ class RetryAction(object):
|
||||
self._get_retry_args(retry,
|
||||
addons=arg_addons))
|
||||
fut.add_done_callback(_on_done_callback)
|
||||
fut.atom = retry
|
||||
return fut
|
||||
|
||||
def on_failure(self, retry, atom, last_failure):
|
||||
|
||||
@@ -40,7 +40,7 @@ def _execute_task(task, arguments, progress_callback):
|
||||
result = failure.Failure()
|
||||
finally:
|
||||
task.post_execute()
|
||||
return (task, EXECUTED, result)
|
||||
return (EXECUTED, result)
|
||||
|
||||
|
||||
def _revert_task(task, arguments, result, failures, progress_callback):
|
||||
@@ -57,7 +57,7 @@ def _revert_task(task, arguments, result, failures, progress_callback):
|
||||
result = failure.Failure()
|
||||
finally:
|
||||
task.post_revert()
|
||||
return (task, REVERTED, result)
|
||||
return (REVERTED, result)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
@@ -98,13 +98,17 @@ class SerialTaskExecutor(TaskExecutorBase):
|
||||
self._executor = futures.SynchronousExecutor()
|
||||
|
||||
def execute_task(self, task, task_uuid, arguments, progress_callback=None):
|
||||
return self._executor.submit(_execute_task, task, arguments,
|
||||
progress_callback)
|
||||
fut = self._executor.submit(_execute_task, task, arguments,
|
||||
progress_callback)
|
||||
fut.atom = task
|
||||
return fut
|
||||
|
||||
def revert_task(self, task, task_uuid, arguments, result, failures,
|
||||
progress_callback=None):
|
||||
return self._executor.submit(_revert_task, task, arguments, result,
|
||||
failures, progress_callback)
|
||||
fut = self._executor.submit(_revert_task, task, arguments, result,
|
||||
failures, progress_callback)
|
||||
fut.atom = task
|
||||
return fut
|
||||
|
||||
def wait_for_any(self, fs, timeout=None):
|
||||
return async_utils.wait_for_any(fs, timeout)
|
||||
@@ -123,14 +127,17 @@ class ParallelTaskExecutor(TaskExecutorBase):
|
||||
self._create_executor = executor is None
|
||||
|
||||
def execute_task(self, task, task_uuid, arguments, progress_callback=None):
|
||||
return self._executor.submit(
|
||||
_execute_task, task, arguments, progress_callback)
|
||||
fut = self._executor.submit(_execute_task, task,
|
||||
arguments, progress_callback)
|
||||
fut.atom = task
|
||||
return fut
|
||||
|
||||
def revert_task(self, task, task_uuid, arguments, result, failures,
|
||||
progress_callback=None):
|
||||
return self._executor.submit(
|
||||
_revert_task, task,
|
||||
arguments, result, failures, progress_callback)
|
||||
fut = self._executor.submit(_revert_task, task, arguments,
|
||||
result, failures, progress_callback)
|
||||
fut.atom = task
|
||||
return fut
|
||||
|
||||
def wait_for_any(self, fs, timeout=None):
|
||||
return async_utils.wait_for_any(fs, timeout)
|
||||
|
||||
@@ -129,8 +129,9 @@ class _MachineBuilder(object):
|
||||
next_nodes = set()
|
||||
while memory.done:
|
||||
fut = memory.done.pop()
|
||||
node = fut.atom
|
||||
try:
|
||||
node, event, result = fut.result()
|
||||
event, result = fut.result()
|
||||
retain = self._completer.complete(node, event, result)
|
||||
if retain and isinstance(result, failure.Failure):
|
||||
memory.failures.append(result)
|
||||
|
||||
@@ -40,11 +40,11 @@ class Endpoint(object):
|
||||
return self._task_cls(name=name)
|
||||
|
||||
def execute(self, task_name, **kwargs):
|
||||
task, event, result = self._executor.execute_task(
|
||||
self._get_task(task_name), **kwargs).result()
|
||||
task = self._get_task(task_name)
|
||||
event, result = self._executor.execute_task(task, **kwargs).result()
|
||||
return result
|
||||
|
||||
def revert(self, task_name, **kwargs):
|
||||
task, event, result = self._executor.revert_task(
|
||||
self._get_task(task_name), **kwargs).result()
|
||||
task = self._get_task(task_name)
|
||||
event, result = self._executor.revert_task(task, **kwargs).result()
|
||||
return result
|
||||
|
||||
@@ -234,6 +234,7 @@ class Request(Message):
|
||||
self._lock = threading.Lock()
|
||||
self._created_on = timeutils.utcnow()
|
||||
self.result = futures.Future()
|
||||
self.result.atom = task
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
@@ -290,7 +291,7 @@ class Request(Message):
|
||||
return request
|
||||
|
||||
def set_result(self, result):
|
||||
self.result.set_result((self._task, self._event, result))
|
||||
self.result.set_result((self._event, result))
|
||||
|
||||
def on_progress(self, event_data, progress):
|
||||
self._progress_callback(self._task, event_data, progress)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
from concurrent import futures
|
||||
|
||||
from taskflow.engines.action_engine import executor as base_executor
|
||||
from taskflow.engines.worker_based import endpoint
|
||||
from taskflow.engines.worker_based import executor as worker_executor
|
||||
from taskflow.engines.worker_based import server as worker_server
|
||||
@@ -73,13 +74,14 @@ class TestPipeline(test.TestCase):
|
||||
self.assertEqual(0, executor.wait_for_workers(timeout=WAIT_TIMEOUT))
|
||||
|
||||
t = test_utils.TaskOneReturn()
|
||||
f = executor.execute_task(t, uuidutils.generate_uuid(), {})
|
||||
progress_callback = lambda *args, **kwargs: None
|
||||
f = executor.execute_task(t, uuidutils.generate_uuid(), {},
|
||||
progress_callback=progress_callback)
|
||||
executor.wait_for_any([f])
|
||||
|
||||
t2, _action, result = f.result()
|
||||
|
||||
event, result = f.result()
|
||||
self.assertEqual(1, result)
|
||||
self.assertEqual(t, t2)
|
||||
self.assertEqual(base_executor.EXECUTED, event)
|
||||
|
||||
def test_execution_failure_pipeline(self):
|
||||
task_classes = [
|
||||
@@ -88,9 +90,12 @@ class TestPipeline(test.TestCase):
|
||||
executor, server = self._start_components(task_classes)
|
||||
|
||||
t = test_utils.TaskWithFailure()
|
||||
f = executor.execute_task(t, uuidutils.generate_uuid(), {})
|
||||
progress_callback = lambda *args, **kwargs: None
|
||||
f = executor.execute_task(t, uuidutils.generate_uuid(), {},
|
||||
progress_callback=progress_callback)
|
||||
executor.wait_for_any([f])
|
||||
|
||||
_t2, _action, result = f.result()
|
||||
action, result = f.result()
|
||||
self.assertIsInstance(result, failure.Failure)
|
||||
self.assertEqual(RuntimeError, result.check(RuntimeError))
|
||||
self.assertEqual(base_executor.EXECUTED, action)
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
from concurrent import futures
|
||||
from oslo.utils import timeutils
|
||||
|
||||
from taskflow.engines.action_engine import executor
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.openstack.common import uuidutils
|
||||
@@ -182,7 +183,7 @@ class TestProtocol(test.TestCase):
|
||||
request = self.request()
|
||||
request.set_result(111)
|
||||
result = request.result.result()
|
||||
self.assertEqual(result, (self.task, 'executed', 111))
|
||||
self.assertEqual(result, (executor.EXECUTED, 111))
|
||||
|
||||
def test_on_progress(self):
|
||||
progress_callback = mock.MagicMock(name='progress_callback')
|
||||
|
||||
Reference in New Issue
Block a user