Merge "Allow WBE request transition timeout to be dynamic"

This commit is contained in:
Jenkins
2014-09-23 01:12:44 +00:00
committed by Gerrit Code Review
3 changed files with 24 additions and 9 deletions

View File

@@ -16,6 +16,7 @@
from taskflow.engines.action_engine import engine
from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr
from taskflow import storage as t_storage
@@ -30,8 +31,15 @@ class WorkerBasedActionEngine(engine.ActionEngine):
:param topics: list of workers topics to communicate with (this will also
be learned by listening to the notifications that workers
emit).
:keyword transport: transport to be used (e.g. amqp, memory, etc.)
:keyword transport_options: transport specific options
:param transport: transport to be used (e.g. amqp, memory, etc.)
:param transport_options: transport specific options
:param transition_timeout: numeric value (or None for infinite) to wait
for submitted remote requests to transition out
of the (PENDING, WAITING) request states. When
expired the associated task the request was made
for will have its result become a
`RequestTimeout` exception instead of its
normally returned value (or raised exception).
"""
_storage_factory = t_storage.SingleThreadedStorage
@@ -45,7 +53,9 @@ class WorkerBasedActionEngine(engine.ActionEngine):
exchange=self._conf.get('exchange', 'default'),
topics=self._conf.get('topics', []),
transport=self._conf.get('transport'),
transport_options=self._conf.get('transport_options'))
transport_options=self._conf.get('transport_options'),
transition_timeout=self._conf.get('transition_timeout',
pr.REQUEST_TIMEOUT))
def __init__(self, flow, flow_detail, backend, conf, **kwargs):
super(WorkerBasedActionEngine, self).__init__(

View File

@@ -71,10 +71,12 @@ class PeriodicWorker(object):
class WorkerTaskExecutor(executor.TaskExecutorBase):
"""Executes tasks on remote workers."""
def __init__(self, uuid, exchange, topics, **kwargs):
def __init__(self, uuid, exchange, topics,
transition_timeout=pr.REQUEST_TIMEOUT, **kwargs):
self._uuid = uuid
self._topics = topics
self._requests_cache = cache.RequestsCache()
self._transition_timeout = transition_timeout
self._workers_cache = cache.WorkersCache()
self._workers_arrival = threading.Condition()
handlers = {
@@ -172,10 +174,11 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
self._requests_cache.cleanup(self._handle_expired_request)
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback, timeout=pr.REQUEST_TIMEOUT, **kwargs):
progress_callback, **kwargs):
"""Submit task request to a worker."""
request = pr.Request(task, task_uuid, action, arguments,
progress_callback, timeout, **kwargs)
progress_callback, self._transition_timeout,
**kwargs)
# Get task's topic and publish request if topic was found.
topic = self._workers_cache.get_topic_by_task(request.task_cls)

View File

@@ -46,7 +46,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
exchange='default',
topics=[],
transport=None,
transport_options=None)
transport_options=None,
transition_timeout=mock.ANY)
]
self.assertEqual(self.master_mock.mock_calls, expected_calls)
@@ -55,7 +56,7 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
_, flow_detail = pu.temporary_flow_detail()
config = {'url': self.broker_url, 'exchange': self.exchange,
'topics': self.topics, 'transport': 'memory',
'transport_options': {}}
'transport_options': {}, 'transition_timeout': 200}
engine.WorkerBasedActionEngine(
flow, flow_detail, None, config).compile()
@@ -65,6 +66,7 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
exchange=self.exchange,
topics=self.topics,
transport='memory',
transport_options={})
transport_options={},
transition_timeout=200)
]
self.assertEqual(self.master_mock.mock_calls, expected_calls)