diff --git a/doc/source/workers.rst b/doc/source/workers.rst index 86548992..7c4f0112 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -416,7 +416,6 @@ Interfaces ========== .. automodule:: taskflow.engines.worker_based.engine -.. automodule:: taskflow.engines.worker_based.executor .. automodule:: taskflow.engines.worker_based.proxy .. automodule:: taskflow.engines.worker_based.worker diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index a161ee58..aee39e89 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -32,7 +32,6 @@ class WorkerBasedActionEngine(engine.ActionEngine): be learned by listening to the notifications that workers emit). :param transport: transport to be used (e.g. amqp, memory, etc.) - :param transport_options: transport specific options :param transition_timeout: numeric value (or None for infinite) to wait for submitted remote requests to transition out of the (PENDING, WAITING) request states. When @@ -40,8 +39,11 @@ class WorkerBasedActionEngine(engine.ActionEngine): for will have its result become a `RequestTimeout` exception instead of its normally returned value (or raised exception). - :param retry_options: retry specific options (used to configure how kombu - handles retrying under tolerable/transient failures). + :param transport_options: transport specific options (see: + http://kombu.readthedocs.org/ for what these + options imply and are expected to be) + :param retry_options: retry specific options + (see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`) """ _storage_factory = t_storage.SingleThreadedStorage diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 430be79f..f4046234 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -47,12 +47,7 @@ class Proxy(object): For **internal** usage only (not for public consumption). """ - # Settings that are by default used for consumers/producers to reconnect - # under tolerable/transient failures... - # - # See: http://kombu.readthedocs.org/en/latest/reference/kombu.html for - # what these values imply... - _DEFAULT_RETRY_OPTIONS = { + DEFAULT_RETRY_OPTIONS = { # The number of seconds we start sleeping for. 'interval_start': 1, # How many seconds added to the interval for each retry. @@ -62,6 +57,12 @@ class Proxy(object): # Maximum number of times to retry. 'max_retries': 3, } + """Settings used (by default) to reconnect under transient failures. + + See: http://kombu.readthedocs.org/ (and connection ``ensure_options``) for + what these values imply/mean... + """ + # This is the only provided option that should be an int, the others # are allowed to be floats; used when we check that the user-provided # value is valid... @@ -81,7 +82,7 @@ class Proxy(object): # running, otherwise requeue them. lambda data, message: not self.is_running) - ensure_options = self._DEFAULT_RETRY_OPTIONS.copy() + ensure_options = self.DEFAULT_RETRY_OPTIONS.copy() if retry_options is not None: # Override the defaults with any user provided values... for k in set(six.iterkeys(ensure_options)): diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index 4273af60..2110b92b 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -81,9 +81,11 @@ class Worker(object): default executor (used only if an executor is not passed in) :param transport: transport to be used (e.g. amqp, memory, etc.) - :param transport_options: transport specific options - :param retry_options: retry specific options (used to configure how kombu - handles retrying under tolerable/transient failures). + :param transport_options: transport specific options (see: + http://kombu.readthedocs.org/ for what these + options imply and are expected to be) + :param retry_options: retry specific options + (see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`) """ def __init__(self, exchange, topic, tasks,