WBE notification message validation

Add send and receive validation of the notify
message that is sent between executors and workers
to be more robust around invalid message formats
being sent and received.

Part of blueprint wbe-message-validation

Change-Id: I7300d6f2d00e48c4f989c7f958a028bdff4afdd4
This commit is contained in:
Joshua Harlow
2014-06-30 12:51:36 -07:00
parent 9bce85dc66
commit 5237533a8c
4 changed files with 55 additions and 2 deletions

View File

@@ -10,3 +10,4 @@ Babel>=1.3
stevedore>=0.14
# Backport for concurrent.futures which exists in 3.2+
futures>=2.1.3
jsonschema>=2.0.0,<3.0.0

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
from taskflow.engines.action_engine import executor
@@ -74,7 +75,10 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
self._requests_cache = cache.RequestsCache()
self._workers_cache = cache.WorkersCache()
handlers = {
pr.NOTIFY: self._process_notify,
pr.NOTIFY: [
self._process_notify,
functools.partial(pr.Notify.validate, response=True),
],
pr.RESPONSE: self._process_response,
}
self._proxy = proxy.Proxy(uuid, exchange, handlers,

View File

@@ -17,9 +17,12 @@
import abc
from concurrent import futures
import jsonschema
from jsonschema import exceptions as schema_exc
import six
from taskflow.engines.action_engine import executor
from taskflow import exceptions as excp
from taskflow.types import time
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -78,12 +81,54 @@ class Notify(Message):
"""Represents notify message type."""
TYPE = NOTIFY
# NOTE(harlowja): the executor (the entity who initially requests a worker
# to send back a notification response) schema is different than the
# worker response schema (that's why there are two schemas here).
_RESPONSE_SCHEMA = {
"type": "object",
'properties': {
'topic': {
"type": "string",
},
'tasks': {
"type": "array",
"items": {
"type": "string",
},
}
},
"required": ["topic", 'tasks'],
"additionalProperties": False,
}
_SENDER_SCHEMA = {
"type": "object",
"additionalProperties": False,
}
def __init__(self, **data):
self._data = data
def to_dict(self):
return self._data
@classmethod
def validate(cls, data, response):
if response:
schema = cls._RESPONSE_SCHEMA
else:
schema = cls._SENDER_SCHEMA
try:
jsonschema.validate(data, schema)
except schema_exc.ValidationError as e:
if response:
raise excp.InvalidFormat("%s message response data not of the"
" expected format: %s"
% (cls.TYPE, e.message), e)
else:
raise excp.InvalidFormat("%s message sender data not of the"
" expected format: %s"
% (cls.TYPE, e.message), e)
class Request(Message):
"""Represents request with execution results.

View File

@@ -45,7 +45,10 @@ class Server(object):
def __init__(self, topic, exchange, executor, endpoints, **kwargs):
handlers = {
pr.NOTIFY: delayed(executor)(self._process_notify),
pr.NOTIFY: [
delayed(executor)(self._process_notify),
functools.partial(pr.Notify.validate, response=False),
],
pr.REQUEST: delayed(executor)(self._process_request),
}
self._proxy = proxy.Proxy(topic, exchange, handlers,