From 5237533a8cfa12153a85a76d1978544093f31a2b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 30 Jun 2014 12:51:36 -0700 Subject: [PATCH] WBE notification message validation Add send and receive validation of the notify message that is sent between executors and workers to be more robust around invalid message formats being sent and received. Part of blueprint wbe-message-validation Change-Id: I7300d6f2d00e48c4f989c7f958a028bdff4afdd4 --- requirements.txt | 1 + taskflow/engines/worker_based/executor.py | 6 ++- taskflow/engines/worker_based/protocol.py | 45 +++++++++++++++++++++++ taskflow/engines/worker_based/server.py | 5 ++- 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 21fc544a..764c2ad2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ Babel>=1.3 stevedore>=0.14 # Backport for concurrent.futures which exists in 3.2+ futures>=2.1.3 +jsonschema>=2.0.0,<3.0.0 diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 8bd799c4..ba599e69 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import logging from taskflow.engines.action_engine import executor @@ -74,7 +75,10 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): self._requests_cache = cache.RequestsCache() self._workers_cache = cache.WorkersCache() handlers = { - pr.NOTIFY: self._process_notify, + pr.NOTIFY: [ + self._process_notify, + functools.partial(pr.Notify.validate, response=True), + ], pr.RESPONSE: self._process_response, } self._proxy = proxy.Proxy(uuid, exchange, handlers, diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 1eb43227..c107af65 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -17,9 +17,12 @@ import abc from concurrent import futures +import jsonschema +from jsonschema import exceptions as schema_exc import six from taskflow.engines.action_engine import executor +from taskflow import exceptions as excp from taskflow.types import time from taskflow.utils import misc from taskflow.utils import reflection @@ -78,12 +81,54 @@ class Notify(Message): """Represents notify message type.""" TYPE = NOTIFY + # NOTE(harlowja): the executor (the entity who initially requests a worker + # to send back a notification response) schema is different than the + # worker response schema (that's why there are two schemas here). + _RESPONSE_SCHEMA = { + "type": "object", + 'properties': { + 'topic': { + "type": "string", + }, + 'tasks': { + "type": "array", + "items": { + "type": "string", + }, + } + }, + "required": ["topic", 'tasks'], + "additionalProperties": False, + } + _SENDER_SCHEMA = { + "type": "object", + "additionalProperties": False, + } + def __init__(self, **data): self._data = data def to_dict(self): return self._data + @classmethod + def validate(cls, data, response): + if response: + schema = cls._RESPONSE_SCHEMA + else: + schema = cls._SENDER_SCHEMA + try: + jsonschema.validate(data, schema) + except schema_exc.ValidationError as e: + if response: + raise excp.InvalidFormat("%s message response data not of the" + " expected format: %s" + % (cls.TYPE, e.message), e) + else: + raise excp.InvalidFormat("%s message sender data not of the" + " expected format: %s" + % (cls.TYPE, e.message), e) + class Request(Message): """Represents request with execution results. diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 7f10113f..1e86fb23 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -45,7 +45,10 @@ class Server(object): def __init__(self, topic, exchange, executor, endpoints, **kwargs): handlers = { - pr.NOTIFY: delayed(executor)(self._process_notify), + pr.NOTIFY: [ + delayed(executor)(self._process_notify), + functools.partial(pr.Notify.validate, response=False), + ], pr.REQUEST: delayed(executor)(self._process_request), } self._proxy = proxy.Proxy(topic, exchange, handlers,