Merge "[task results]Add some validation for importing task results"

This commit is contained in:
Jenkins 2017-10-11 10:43:23 +00:00 committed by Gerrit Code Review
commit da32f93e04
7 changed files with 492 additions and 314 deletions

View File

@ -27,6 +27,7 @@ import jsonschema
from oslo_config import cfg
import requests
from requests.packages import urllib3
import six
from rally.common import logging
from rally.common import objects
@ -525,7 +526,14 @@ class _Task(APIGroup):
task_uuid, status=consts.TaskStatus.FINISHED)
def import_results(self, deployment, task_results, tags=None):
"""Import json results of a task into database."""
"""Import json results of a task into rally database"""
try:
jsonschema.validate(task_results, objects.task.TASK_SCHEMA)
except jsonschema.ValidationError as e:
msg = six.text_type(e)
raise exceptions.RallyException(
"ERROR: Invalid task result format\n\n\t%s" % msg)
deployment = objects.Deployment.get(deployment)
if deployment["status"] != consts.DeployStatus.DEPLOY_FINISHED:
raise exceptions.DeploymentNotFinishedStatus(
@ -539,12 +547,19 @@ class _Task(APIGroup):
for subtask in task_results["subtasks"]:
subtask_obj = task_inst.add_subtask(title=subtask.get("title"))
for workload in subtask["workloads"]:
for data in workload["data"]:
if not task_inst.result_has_valid_schema(data):
raise exceptions.RallyException(
"Task %s is trying to import "
"results in wrong format" % task_inst["uuid"])
workload_obj = subtask_obj.add_workload(
name=workload["name"], description=workload["description"],
position=workload["position"], runner=workload["runner"],
runner_type=workload["runner_type"],
context=workload["context"], hooks=workload["hooks"],
sla=workload["sla"], args=workload["args"])
chunk_size = CONF.raw_result_chunk_size
workload_data_count = 0
while len(workload["data"]) > chunk_size:
@ -554,6 +569,7 @@ class _Task(APIGroup):
workload_obj.add_workload_data(workload_data_count,
{"raw": results_chunk})
workload_data_count += 1
workload_obj.add_workload_data(workload_data_count,
{"raw": workload["data"]})
workload_obj.set_results(

View File

@ -14,6 +14,7 @@
# under the License.
import collections
import copy
import datetime as dt
import uuid
@ -21,12 +22,163 @@ from rally.common import db
from rally.common import logging
from rally import consts
from rally import exceptions
from rally.task.processing import charts
LOG = logging.getLogger(__name__)
OUTPUT_SCHEMA = {
TASK_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"uuid": {"type": "string"},
"title": {"type": "string"},
"description": {"type": "string"},
"version": {"type": "number"},
"status": {"type": "string"},
"tags": {"type": "array"},
"created_at": {"type": "string"},
"updated_at": {"type": "string"},
"pass_sla": {"type": "boolean"},
"task_duration": {"type": "number"},
"subtasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"task_uuid": {"type": "string"},
"title": {"type": "string"},
"description": {"type": "string"},
"status": {"type": "string"},
"pass_sla": {"type": "boolean"},
"run_in_parallel": {"type": "boolean"},
"created_at": {"type": "string"},
"updated_at": {"type": "string"},
"sla": {"type": "object"},
"context": {"type": "object"},
"duration": {"type": "number"},
"workloads": {
"type": "array",
"items": {"$ref": "#/definitions/workload"}
}
},
"required": ["workloads"],
"additionalProperties": False
}
}
},
"required": ["subtasks"],
"additionalProperties": False,
"definitions": {
"workload": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"task_uuid": {"type": "string"},
"subtask_uuid": {"type": "string"},
"name": {"type": "string"},
"description": {"type": "string"},
"args": {"type": "object"},
"runner": {"type": "object"},
"runner_type": {"type": "string"},
"hooks": {
"type": "array",
"items": {"$ref": "#/definitions/hook_result"}
},
"min_duration": {"type": "number"},
"max_duration": {"type": "number"},
"start_time": {"oneOf": [
{"type": "number",
"description": "The timestamp of load start"},
{"type": "null",
"description": "The load was not started"}]},
"load_duration": {"type": "number"},
"full_duration": {"type": "number"},
"statistics": {
"type": "object",
"properties": {
"durations": {"type": "object"},
"atomics": {"type": "object"}
}
},
"data": {"type": "array"},
"failed_iteration_count": {"type": "integer"},
"total_iteration_count": {"type": "integer"},
"created_at": {"type": "string"},
"updated_at": {"type": "string"},
"context": {"type": "object"},
"position": {"type": "integer"},
"pass_sla": {"type": "boolean"},
"sla_results": {
"type": "object",
"properties": {
"sla": {
"type": "array",
"items": {
"type": "object",
"properties": {
"criterion": {
"type": "string"
},
"detail": {
"type": "string"
},
"success": {
"type": "boolean"
}
}
}
}
}
},
"sla": {"type": "object"}
},
"required": ["pass_sla", "sla_results", "sla", "statistics",
"context", "data", "runner", "args", "full_duration",
"load_duration", "total_iteration_count",
"failed_iteration_count", "position"],
"additionalProperties": False
},
"hook_result": {
"type": "object",
"properties": {
"config": {"type": "object"},
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"started_at": {"type": "number"},
"finished_at": {"type": "number"},
"triggered_by": {
"type": "object",
"properties": {
"event_type": {"type": "string"},
"value": {}},
"required": ["event_type", "value"],
"additionalProperties": False
},
"status": {"type": "string"},
"error": {
"type": "array",
"minItems": 3,
"maxItems": 3,
"items": {"type": "string"},
},
"output": {"$ref": "#/definitions/output"},
},
"required": ["finished_at", "triggered_by", "status"],
"additionalProperties": False
}
},
"summary": {"type": "object"}
},
"required": ["config", "results", "summary"],
"additionalProperties": False,
},
"output": {
"type": "object",
"properties": {
"additive": {
@ -66,11 +218,15 @@ OUTPUT_SCHEMA = {
{"type": "string"},
{"anyOf": [
{"type": "array",
"items": {"type": "array",
"items": [{"type": "number"},
"items": {
"type": "array",
"items": [
{"type": "number"},
{"type": "number"}]
}},
{"type": "number"}]}]}},
{"type": "number"}]
}]
}},
{"type": "object",
"properties": {
"cols": {"type": "array",
@ -79,13 +235,17 @@ OUTPUT_SCHEMA = {
"type": "array",
"items": {
"type": "array",
"items": {"anyOf": [{"type": "string"},
{"type": "number"}]}}
"items": {
"anyOf": [{"type": "string"},
{"type": "number"}]
}
}
}
},
"required": ["cols", "rows"],
"additionalProperties": False},
{"type": "array", "items": {"type": "string"}},
{"type": "array",
"items": {"type": "string"}},
]},
"label": {"type": "string"},
"axis_label": {"type": "string"}
@ -98,42 +258,7 @@ OUTPUT_SCHEMA = {
"required": ["additive", "complete"],
"additionalProperties": False
}
HOOK_RUN_RESULT_SCHEMA = {
"type": "object",
"properties": {
"started_at": {"type": "number"},
"finished_at": {"type": "number"},
"triggered_by": {
"type": "object",
"properties": {"event_type": {"type": "string"},
"value": {}},
"required": ["event_type", "value"],
"additionalProperties": False
},
"status": {"type": "string"},
"error": {
"type": "array",
"minItems": 3,
"maxItems": 3,
"items": {"type": "string"},
},
"output": OUTPUT_SCHEMA,
},
"required": ["finished_at", "triggered_by", "status"],
"additionalProperties": False
}
HOOK_RESULTS_SCHEMA = {
"type": "object",
"properties": {
"config": {"type": "object"},
"results": {"type": "array",
"items": HOOK_RUN_RESULT_SCHEMA},
"summary": {"type": "object"}
},
"required": ["config", "results", "summary"],
"additionalProperties": False,
}
@ -265,6 +390,86 @@ class Task(object):
self.update_status(new_status, allowed_statuses=(
consts.TaskStatus.RUNNING, consts.TaskStatus.SOFT_ABORTING))
def result_has_valid_schema(self, result):
"""Check whatever result has valid schema or not."""
# NOTE(boris-42): We can't use here jsonschema, this method is called
# to check every iteration result schema. And this
# method works 200 times faster then jsonschema
# which totally makes sense.
_RESULT_SCHEMA = {
"fields": [("duration", float), ("timestamp", float),
("idle_duration", float), ("output", dict),
("atomic_actions", list), ("error", list)]
}
for key, proper_type in _RESULT_SCHEMA["fields"]:
if key not in result:
LOG.warning("'%s' is not result" % key)
return False
if not isinstance(result[key], proper_type):
LOG.warning(
"Task %(uuid)s | result['%(key)s'] has wrong type "
"'%(actual_type)s', should be '%(proper_type)s'"
% {"uuid": self.task["uuid"],
"key": key,
"actual_type": type(result[key]),
"proper_type": proper_type.__name__})
return False
actions_list = copy.deepcopy(result["atomic_actions"])
for action in actions_list:
for key in ("name", "started_at", "finished_at", "children"):
if key not in action:
LOG.warning(
"Task %(uuid)s | Atomic action %(action)s "
"missing key '%(key)s'"
% {"uuid": self.task["uuid"],
"action": action,
"key": key})
return False
for key in ("started_at", "finished_at"):
if not isinstance(action[key], float):
LOG.warning(
"Task %(uuid)s | Atomic action %(action)s has "
"wrong type '%(type)s', should be 'float'"
% {"uuid": self.task["uuid"],
"action": action,
"type": type(action[key])})
return False
if action["children"]:
actions_list.extend(action["children"])
for e in result["error"]:
if not isinstance(e, str):
LOG.warning("error value has wrong type '%s', should be 'str'"
% type(e))
return False
for key in ("additive", "complete"):
if key not in result["output"]:
LOG.warning("Task %(uuid)s | Output missing key '%(key)s'"
% {"uuid": self.task["uuid"], "key": key})
return False
type_ = type(result["output"][key])
if type_ != list:
LOG.warning(
"Task %(uuid)s | Value of result['output']['%(key)s'] "
"has wrong type '%(type)s', must be 'list'"
% {"uuid": self.task["uuid"],
"key": key, "type": type_.__name__})
return False
for key in result["output"]:
for output_data in result["output"][key]:
message = charts.validate_output(key, output_data)
if message:
LOG.warning("Task %(uuid)s | %(message)s"
% {"uuid": self.task["uuid"],
"message": message})
return False
return True
class Subtask(object):
"""Represents a subtask object."""

View File

@ -25,7 +25,6 @@ from rally.common import logging
from rally.common.plugin import plugin
from rally.common import utils as rutils
from rally.common import validation
from rally.task.processing import charts
from rally.task import scenario
from rally.task import types
from rally.task import utils
@ -228,87 +227,6 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin):
self.result_queue.append(sorted_batch)
del self.result_batch[:]
_RESULT_SCHEMA = {
"fields": [("duration", float), ("timestamp", float),
("idle_duration", float), ("output", dict),
("atomic_actions", list), ("error", list)]
}
def _result_has_valid_schema(self, result):
"""Check whatever result has valid schema or not."""
# NOTE(boris-42): We can't use here jsonschema, this method is called
# to check every iteration result schema. And this
# method works 200 times faster then jsonschema
# which totally makes sense.
for key, proper_type in self._RESULT_SCHEMA["fields"]:
if key not in result:
LOG.warning("'%s' is not result" % key)
return False
if not isinstance(result[key], proper_type):
LOG.warning(
"Task %(uuid)s | result['%(key)s'] has wrong type "
"'%(actual_type)s', should be '%(proper_type)s'"
% {"uuid": self.task["uuid"],
"key": key,
"actual_type": type(result[key]),
"proper_type": proper_type.__name__})
return False
actions_list = copy.deepcopy(result["atomic_actions"])
for action in actions_list:
for key in ("name", "started_at", "finished_at", "children"):
if key not in action:
LOG.warning(
"Task %(uuid)s | Atomic action %(action)s "
"missing key '%(key)s'"
% {"uuid": self.task["uuid"],
"action": action,
"key": key})
return False
for key in ("started_at", "finished_at"):
if not isinstance(action[key], float):
LOG.warning(
"Task %(uuid)s | Atomic action %(action)s has "
"wrong type '%(type)s', should be 'float'"
% {"uuid": self.task["uuid"],
"action": action,
"type": type(action[key])})
return False
if action["children"]:
actions_list.extend(action["children"])
for e in result["error"]:
if not isinstance(e, str):
LOG.warning("error value has wrong type '%s', should be 'str'"
% type(e))
return False
for key in ("additive", "complete"):
if key not in result["output"]:
LOG.warning("Task %(uuid)s | Output missing key '%(key)s'"
% {"uuid": self.task["uuid"], "key": key})
return False
type_ = type(result["output"][key])
if type_ != list:
LOG.warning(
"Task %(uuid)s | Value of result['output']['%(key)s'] "
"has wrong type '%(type)s', must be 'list'"
% {"uuid": self.task["uuid"],
"key": key, "type": type_.__name__})
return False
for key in result["output"]:
for output_data in result["output"][key]:
message = charts.validate_output(key, output_data)
if message:
LOG.warning("Task %(uuid)s | %(message)s"
% {"uuid": self.task["uuid"],
"message": message})
return False
return True
def _send_result(self, result):
"""Store partial result to send it to consumer later.
@ -317,7 +235,7 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin):
ValidationError is raised.
"""
if not self._result_has_valid_schema(result):
if not self.task.result_has_valid_schema(result):
LOG.warning(
"Task %(task)s | Runner `%(runner)s` is trying to send "
"results in wrong format"

View File

@ -274,6 +274,122 @@ class TaskTestCase(test.TestCase):
consts.TaskStatus.SOFT_ABORTING)
)
@ddt.data(
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"output": {"additive": [], "complete": []},
"error": ["err1", "err2"], "atomic_actions": []},
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]},
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": ["a1", "a2"],
"complete": ["c1", "c2"]},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]},
"validate_output_calls": [("additive", "a1"), ("additive", "a2"),
("complete", "c1"), ("complete", "c2")],
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": ["a1", "a2"],
"complete": ["c1", "c2"]},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]},
"validate_output_return_value": "validation error message"},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [42], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 10,
"finished_at": 52, "children": []}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "non-float", "started_at": 1.0,
"children": []}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 4.0,
"children": [{"name": "foo1",
"started_at": 2.0,
"finished_at": 3.0,
"children": []}]}]},
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 4.0,
"children": [{"name": "foo1",
"started_at": 20,
"finished_at": 30,
"children": []}]}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 4.0,
"children": [{"name": "foo1",
"started_at": 2.0,
"finished_at": 3.0}]}]}},
{"data": {"duration": 1, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": "foo", "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {}, "atomic_actions": []}},
{"data": {"timestamp": 1.0, "idle_duration": 1.0, "error": [],
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "idle_duration": 1.0, "error": [],
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "error": [],
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []}}},
{"data": []},
{"data": {}},
{"data": "foo"})
@ddt.unpack
@mock.patch("rally.common.objects.task.LOG")
@mock.patch("rally.common.objects.task.charts.validate_output")
def test_result_has_valid_schema(self, mock_validate_output, mock_log,
data, expected=False,
validate_output_return_value=None,
validate_output_calls=None):
task = objects.Task(task=self.task)
mock_validate_output.return_value = validate_output_return_value
self.assertEqual(expected,
task.result_has_valid_schema(data),
message=repr(data))
if validate_output_calls:
mock_validate_output.assert_has_calls(
[mock.call(*args) for args in validate_output_calls],
any_order=True)
class SubtaskTestCase(test.TestCase):

View File

@ -284,7 +284,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
def test_run_scenario_constantly_for_duration(self):
runner_obj = constant.ConstantForDurationScenarioRunner(
None, self.config)
mock.MagicMock(), self.config)
runner_obj._run_scenario(fakes.FakeScenario, "do_it",
self.context, self.args)
@ -297,7 +297,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
def test_run_scenario_constantly_for_duration_exception(self):
runner_obj = constant.ConstantForDurationScenarioRunner(
None, self.config)
mock.MagicMock(), self.config)
runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong",
self.context, self.args)
@ -311,7 +311,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
def test_run_scenario_constantly_for_duration_timeout(self):
runner_obj = constant.ConstantForDurationScenarioRunner(
None, self.config)
mock.MagicMock(), self.config)
runner_obj._run_scenario(fakes.FakeScenario, "raise_timeout",
self.context, self.args)

View File

@ -214,137 +214,23 @@ class ScenarioRunnerTestCase(test.TestCase):
scenario_runner._meta_set("name", "FakePlugin_%s" % id(ScenarioRunner))
return scenario_runner
@ddt.data(
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"output": {"additive": [], "complete": []},
"error": ["err1", "err2"], "atomic_actions": []},
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]},
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": ["a1", "a2"],
"complete": ["c1", "c2"]},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]},
"validate_output_calls": [("additive", "a1"), ("additive", "a2"),
("complete", "c1"), ("complete", "c2")],
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": ["a1", "a2"],
"complete": ["c1", "c2"]},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]},
"validate_output_return_value": "validation error message"},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [42], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 5.2, "children": []}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 10,
"finished_at": 52, "children": []}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "non-float", "started_at": 1.0,
"children": []}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 4.0,
"children": [{"name": "foo1",
"started_at": 2.0,
"finished_at": 3.0,
"children": []}]}]},
"expected": True},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 4.0,
"children": [{"name": "foo1",
"started_at": 20,
"finished_at": 30,
"children": []}]}]}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": [{"name": "foo", "started_at": 1.0,
"finished_at": 4.0,
"children": [{"name": "foo1",
"started_at": 2.0,
"finished_at": 3.0}]}]}},
{"data": {"duration": 1, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1,
"error": [], "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": "foo", "output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {}, "atomic_actions": []}},
{"data": {"timestamp": 1.0, "idle_duration": 1.0, "error": [],
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "idle_duration": 1.0, "error": [],
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "error": [],
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"output": {"additive": [], "complete": []},
"atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "atomic_actions": []}},
{"data": {"duration": 1.0, "timestamp": 1.0, "idle_duration": 1.0,
"error": [], "output": {"additive": [], "complete": []}}},
{"data": []},
{"data": {}},
{"data": "foo"})
@ddt.unpack
@mock.patch("rally.task.runner.LOG")
@mock.patch(BASE + "charts.validate_output")
def test__result_has_valid_schema(self, mock_validate_output, mock_log,
data, expected=False,
validate_output_return_value=None,
validate_output_calls=None):
runner_ = self._get_runner(task={"uuid": "foo_uuid"})
mock_validate_output.return_value = validate_output_return_value
self.assertEqual(expected,
runner_._result_has_valid_schema(data),
message=repr(data))
if validate_output_calls:
mock_validate_output.assert_has_calls(
[mock.call(*args) for args in validate_output_calls],
any_order=True)
def test__send_result(self):
runner_ = self._get_runner(task={"uuid": "foo_uuid"})
task = fakes.FakeTask(uuid="foo_uuid")
task.result_has_valid_schema = mock.MagicMock(return_value=True)
runner_ = self._get_runner(task=task)
result = {"timestamp": 42}
runner_._result_has_valid_schema = mock.Mock(return_value=True)
self.assertIsNone(runner_._send_result(result))
self.assertEqual([], runner_.result_batch)
self.assertEqual(collections.deque([[result]]), runner_.result_queue)
@mock.patch("rally.task.runner.LOG")
def test__send_result_with_invalid_schema(self, mock_log):
runner_ = self._get_runner(task={"uuid": "foo_uuid"})
task = fakes.FakeTask(uuid="foo_uuid")
task.result_has_valid_schema = mock.MagicMock(return_value=False)
runner_ = self._get_runner(task=task)
result = {"timestamp": 42}
runner_._result_has_valid_schema = mock.Mock(return_value=False)
self.assertIsNone(runner_._send_result(result))
runner_._result_has_valid_schema.assert_called_once_with(result)
runner_.task.result_has_valid_schema.assert_called_once_with(result)
self.assertTrue(mock_log.warning.called)
self.assertEqual([], runner_.result_batch)
self.assertEqual(collections.deque([]), runner_.result_queue)

View File

@ -514,13 +514,17 @@ class TaskAPITestCase(test.TestCase):
"load_duration": 1,
"start_time": 23.77,
"position": 77,
"runner": "runner-config",
"runner_type": "runner-type",
"context": "ctx-config",
"hooks": "hooks-config",
"sla": "sla-config",
"sla_results": {"sla": "sla=result"},
"args": "scen-args",
"runner": {},
"runner_type": "",
"context": {},
"hooks": [],
"pass_sla": True,
"sla": {},
"sla_results": {"sla": [{"success": True}]},
"args": {},
"statistics": {},
"total_iteration_count": 3,
"failed_iteration_count": 0,
"data": ["data-raw"]}
task_results = {"subtasks": [
@ -577,13 +581,17 @@ class TaskAPITestCase(test.TestCase):
"load_duration": 1,
"start_time": 23.77,
"position": 77,
"runner": "runner-config",
"runner_type": "runner-type",
"context": "ctx-config",
"hooks": "hooks-config",
"sla": "sla-config",
"sla_results": {"sla": "sla=result"},
"args": "scen-args",
"runner": {},
"runner_type": "",
"context": {},
"hooks": [],
"pass_sla": True,
"sla": {},
"sla_results": {"sla": [{"success": True}]},
"args": {},
"statistics": {},
"total_iteration_count": 3,
"failed_iteration_count": 0,
"data": [{"timestamp": 1},
{"timestamp": 2},
{"timestamp": 3}]}
@ -628,8 +636,9 @@ class TaskAPITestCase(test.TestCase):
hooks_results=workload["hooks"], start_time=workload["start_time"])
@mock.patch("rally.api.objects.Deployment.get")
@mock.patch("rally.api.jsonschema.validate", return_value=True)
def test_import_results_with_inconsistent_deployment(
self, mock_deployment_get):
self, mock_jsonschema_validate, mock_deployment_get):
fake_deployment = fakes.FakeDeployment(
uuid="deployment_uuid", admin="fake_admin", users=["fake_user"],
status=consts.DeployStatus.DEPLOY_INCONSISTENT,
@ -639,9 +648,37 @@ class TaskAPITestCase(test.TestCase):
self.assertRaises(exceptions.DeploymentNotFinishedStatus,
self.task_inst.import_results,
deployment="deployment_uuid",
task_results=[],
task_results={},
tags=["tag"])
@mock.patch("rally.api.objects.Deployment.get")
def test_import_results_with_error_jsonschema(
self, mock_deployment_get):
self.assertRaises(exceptions.RallyException,
self.task_inst.import_results,
deployment="deployment_uuid",
task_results={"key": "invalid json"})
@mock.patch("rally.api.objects.Task")
@mock.patch("rally.api.objects.Deployment.get")
@mock.patch("rally.api.jsonschema.validate", return_value=True)
def test_import_results_with_error_data(
self, mock_jsonschema_validate, mock_deployment_get, mock_task):
mock_deployment_get.return_value = fakes.FakeDeployment(
uuid="deployment_uuid", admin="fake_admin", users=["fake_user"],
status=consts.DeployStatus.DEPLOY_FINISHED)
mock_task.return_value.result_has_valid_schema = mock.MagicMock(
return_value=False)
task_results = {"subtasks": [{"title": "subtask-title",
"workloads": [{"data": [{"a": 1}]}]
}]}
self.assertRaises(exceptions.RallyException,
self.task_inst.import_results,
deployment="deployment_uuid",
task_results=task_results)
class BaseDeploymentTestCase(test.TestCase):
def setUp(self):