diff --git a/doc/source/workers.rst b/doc/source/workers.rst index f02caca7..e54d38eb 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -31,11 +31,12 @@ Executor these requests can be accepted and processed by remote workers. Worker - Workers are started on remote hosts and has list of tasks it can perform (on - request). Workers accept and process task requests that are published by an - executor. Several requests can be processed simultaneously in separate - threads. For example, an `executor`_ can be passed to the worker and - configured to run in as many threads (green or not) as desired. + Workers are started on remote hosts and each has a list of tasks it can + perform (on request). Workers accept and process task requests that are + published by an executor. Several requests can be processed simultaneously + in separate threads (or processes...). For example, an `executor`_ can be + passed to the worker and configured to run in as many threads (green or + not) as desired. Proxy Executors interact with workers via a proxy. The proxy maintains the @@ -153,8 +154,16 @@ engine executor in the following manner: from dicts after receiving on both executor & worker sides (this translation is lossy since the traceback won't be fully retained). -Executor execute format -~~~~~~~~~~~~~~~~~~~~~~~ +Protocol +~~~~~~~~ + +.. automodule:: taskflow.engines.worker_based.protocol + +Examples +~~~~~~~~ + +Request (execute) +""""""""""""""""" * **task_name** - full task name to be performed * **task_cls** - full task class name to be performed @@ -186,8 +195,52 @@ Additionally, the following parameters are added to the request message: ] } -Worker response format -~~~~~~~~~~~~~~~~~~~~~~ + +Request (revert) +"""""""""""""""" + +When **reverting:** + +.. code:: json + + { + "action": "revert", + "arguments": {}, + "failures": { + "taskflow.tests.utils.TaskWithFailure": { + "exc_type_names": [ + "RuntimeError", + "StandardError", + "Exception" + ], + "exception_str": "Woot!", + "traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n", + "version": 1 + } + }, + "result": [ + "failure", + { + "exc_type_names": [ + "RuntimeError", + "StandardError", + "Exception" + ], + "exception_str": "Woot!", + "traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n", + "version": 1 + } + ], + "task_cls": "taskflow.tests.utils.TaskWithFailure", + "task_name": "taskflow.tests.utils.TaskWithFailure", + "task_version": [ + 1, + 0 + ] + } + +Worker response(s) +"""""""""""""""""" When **running:** @@ -241,49 +294,6 @@ When **failed:** "state": "FAILURE" } -Executor revert format -~~~~~~~~~~~~~~~~~~~~~~ - -When **reverting:** - -.. code:: json - - { - "action": "revert", - "arguments": {}, - "failures": { - "taskflow.tests.utils.TaskWithFailure": { - "exc_type_names": [ - "RuntimeError", - "StandardError", - "Exception" - ], - "exception_str": "Woot!", - "traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n", - "version": 1 - } - }, - "result": [ - "failure", - { - "exc_type_names": [ - "RuntimeError", - "StandardError", - "Exception" - ], - "exception_str": "Woot!", - "traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n", - "version": 1 - } - ], - "task_cls": "taskflow.tests.utils.TaskWithFailure", - "task_name": "taskflow.tests.utils.TaskWithFailure", - "task_version": [ - 1, - 0 - ] - } - Request state transitions ------------------------- diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 0bb57a04..8a137471 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -121,12 +121,16 @@ class Message(object): class Notify(Message): """Represents notify message type.""" + + #: String constant representing this message type. TYPE = NOTIFY # NOTE(harlowja): the executor (the entity who initially requests a worker # to send back a notification response) schema is different than the # worker response schema (that's why there are two schemas here). - _RESPONSE_SCHEMA = { + + #: Expected notify *response* message schema (in json schema format). + RESPONSE_SCHEMA = { "type": "object", 'properties': { 'topic': { @@ -142,7 +146,9 @@ class Notify(Message): "required": ["topic", 'tasks'], "additionalProperties": False, } - _SENDER_SCHEMA = { + + #: Expected *sender* request message schema (in json schema format). + SENDER_SCHEMA = { "type": "object", "additionalProperties": False, } @@ -156,9 +162,9 @@ class Notify(Message): @classmethod def validate(cls, data, response): if response: - schema = cls._RESPONSE_SCHEMA + schema = cls.RESPONSE_SCHEMA else: - schema = cls._SENDER_SCHEMA + schema = cls.SENDER_SCHEMA try: jsonschema.validate(data, schema, types=_SCHEMA_TYPES) except schema_exc.ValidationError as e: @@ -180,8 +186,11 @@ class Request(Message): states. """ + #: String constant representing this message type. TYPE = REQUEST - _SCHEMA = { + + #: Expected message schema (in json schema format). + SCHEMA = { "type": "object", 'properties': { # These two are typically only sent on revert actions (that is @@ -349,7 +358,7 @@ class Request(Message): @classmethod def validate(cls, data): try: - jsonschema.validate(data, cls._SCHEMA, types=_SCHEMA_TYPES) + jsonschema.validate(data, cls.SCHEMA, types=_SCHEMA_TYPES) except schema_exc.ValidationError as e: raise excp.InvalidFormat("%s message response data not of the" " expected format: %s" @@ -358,8 +367,12 @@ class Request(Message): class Response(Message): """Represents response message type.""" + + #: String constant representing this message type. TYPE = RESPONSE - _SCHEMA = { + + #: Expected message schema (in json schema format). + SCHEMA = { "type": "object", 'properties': { 'state': { @@ -442,7 +455,7 @@ class Response(Message): @classmethod def validate(cls, data): try: - jsonschema.validate(data, cls._SCHEMA, types=_SCHEMA_TYPES) + jsonschema.validate(data, cls.SCHEMA, types=_SCHEMA_TYPES) except schema_exc.ValidationError as e: raise excp.InvalidFormat("%s message response data not of the" " expected format: %s"