Merge "Add Workload object"
This commit is contained in:
commit
e8ba46705b
@ -138,7 +138,7 @@ New format JSON schema:
|
||||
},
|
||||
|
||||
"run_in_parallel": {"type": "boolean"},
|
||||
"scenarios": {
|
||||
"workloads": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
@ -172,7 +172,7 @@ New format JSON schema:
|
||||
"type": "object"
|
||||
}
|
||||
},
|
||||
"required": ["title", "scenarios"]
|
||||
"required": ["title", "workloads"]
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -231,8 +231,8 @@ New format sample:
|
||||
run_in_parallel: False
|
||||
|
||||
# Single scenario load can be generated by specifying only one element
|
||||
# in "scenarios" section.
|
||||
scenarios:
|
||||
# in "workloads" section.
|
||||
workloads:
|
||||
-
|
||||
# Full name of scenario plugin
|
||||
name: "NovaServers.boot_and_delete"
|
||||
@ -278,7 +278,7 @@ New format sample:
|
||||
# If we put 2 or more scenarios to `scenarios` section we will run
|
||||
# all of them simultaneously which allows us to generate more real life
|
||||
# load
|
||||
scenarios:
|
||||
workloads:
|
||||
-
|
||||
name: "CinderVolumes.create_and_delete"
|
||||
args:
|
||||
|
@ -196,8 +196,8 @@ class TaskEngine(object):
|
||||
|
||||
specified = set()
|
||||
for subtask in config.subtasks:
|
||||
for s in subtask.scenarios:
|
||||
specified.add(s["name"])
|
||||
for s in subtask.workloads:
|
||||
specified.add(s.name)
|
||||
|
||||
if not specified.issubset(available):
|
||||
names = ", ".join(specified - available)
|
||||
@ -206,29 +206,27 @@ class TaskEngine(object):
|
||||
@logging.log_task_wrapper(LOG.info, _("Task validation of syntax."))
|
||||
def _validate_config_syntax(self, config):
|
||||
for subtask in config.subtasks:
|
||||
for pos, scenario_obj in enumerate(subtask.scenarios):
|
||||
for pos, workload in enumerate(subtask.workloads):
|
||||
try:
|
||||
runner.ScenarioRunner.validate(
|
||||
scenario_obj.get("runner", {}))
|
||||
runner.ScenarioRunner.validate(workload.runner)
|
||||
context.ContextManager.validate(
|
||||
scenario_obj.get("context", {}), non_hidden=True)
|
||||
sla.SLA.validate(scenario_obj.get("sla", {}))
|
||||
workload.context, non_hidden=True)
|
||||
sla.SLA.validate(workload.sla)
|
||||
except (exceptions.RallyException,
|
||||
jsonschema.ValidationError) as e:
|
||||
raise exceptions.InvalidTaskConfig(
|
||||
name=scenario_obj["name"],
|
||||
pos=pos, config=scenario_obj,
|
||||
reason=six.text_type(e)
|
||||
)
|
||||
|
||||
def _validate_config_semantic_helper(self, admin, user, name, pos,
|
||||
deployment, kwargs):
|
||||
kw = workload.make_exception_args(
|
||||
pos, six.text_type(e))
|
||||
raise exceptions.InvalidTaskConfig(**kw)
|
||||
|
||||
def _validate_config_semantic_helper(self, admin, user, workload, pos,
|
||||
deployment):
|
||||
try:
|
||||
scenario.Scenario.validate(
|
||||
name, kwargs, admin=admin, users=[user], deployment=deployment)
|
||||
workload.name, workload.to_dict(),
|
||||
admin=admin, users=[user], deployment=deployment)
|
||||
except exceptions.InvalidScenarioArgument as e:
|
||||
kw = {"name": name, "pos": pos,
|
||||
"config": kwargs, "reason": six.text_type(e)}
|
||||
kw = workload.make_exception_args(pos, six.text_type(e))
|
||||
raise exceptions.InvalidTaskConfig(**kw)
|
||||
|
||||
def _get_user_ctx_for_validation(self, ctx):
|
||||
@ -260,10 +258,10 @@ class TaskEngine(object):
|
||||
for u in ctx_conf["users"]:
|
||||
user = osclients.Clients(u["credential"])
|
||||
for subtask in config.subtasks:
|
||||
for pos, scenario_obj in enumerate(subtask.scenarios):
|
||||
for pos, workload in enumerate(subtask.workloads):
|
||||
self._validate_config_semantic_helper(
|
||||
admin, user, scenario_obj["name"],
|
||||
pos, deployment, scenario_obj)
|
||||
admin, user, workload,
|
||||
pos, deployment)
|
||||
|
||||
@logging.log_task_wrapper(LOG.info, _("Task validation."))
|
||||
def validate(self):
|
||||
@ -279,8 +277,8 @@ class TaskEngine(object):
|
||||
raise exceptions.InvalidTaskException(str(e))
|
||||
|
||||
def _get_runner(self, config):
|
||||
conf = config.get("runner", {"type": "serial"})
|
||||
return runner.ScenarioRunner.get(conf["type"])(self.task, conf)
|
||||
config = config or {"type": "serial"}
|
||||
return runner.ScenarioRunner.get(config["type"])(self.task, config)
|
||||
|
||||
def _prepare_context(self, ctx, name, credential):
|
||||
scenario_context = copy.deepcopy(
|
||||
@ -312,7 +310,7 @@ class TaskEngine(object):
|
||||
self.task.update_status(consts.TaskStatus.RUNNING)
|
||||
|
||||
for subtask in self.config.subtasks:
|
||||
for pos, scenario_obj in enumerate(subtask.scenarios):
|
||||
for pos, workload in enumerate(subtask.workloads):
|
||||
|
||||
if ResultConsumer.is_task_in_aborting_status(
|
||||
self.task["uuid"]):
|
||||
@ -320,19 +318,18 @@ class TaskEngine(object):
|
||||
self.task.update_status(consts.TaskStatus.ABORTED)
|
||||
return
|
||||
|
||||
name = scenario_obj["name"]
|
||||
key = {"name": name, "pos": pos, "kw": scenario_obj}
|
||||
key = workload.make_key(pos)
|
||||
LOG.info("Running benchmark with key: \n%s"
|
||||
% json.dumps(key, indent=2))
|
||||
runner_obj = self._get_runner(scenario_obj)
|
||||
runner_obj = self._get_runner(workload.runner)
|
||||
context_obj = self._prepare_context(
|
||||
scenario_obj.get("context", {}), name, self.admin)
|
||||
workload.context, workload.name, self.admin)
|
||||
try:
|
||||
with ResultConsumer(key, self.task, runner_obj,
|
||||
self.abort_on_sla_failure):
|
||||
with context.ContextManager(context_obj):
|
||||
runner_obj.run(name, context_obj,
|
||||
scenario_obj.get("args", {}))
|
||||
runner_obj.run(workload.name, context_obj,
|
||||
workload.args)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
|
||||
@ -397,7 +394,7 @@ class TaskConfig(object):
|
||||
},
|
||||
|
||||
"run_in_parallel": {"type": "boolean"},
|
||||
"scenarios": {
|
||||
"workloads": {
|
||||
"type": "array",
|
||||
"minItems": 1,
|
||||
"maxItems": 1,
|
||||
@ -424,7 +421,7 @@ class TaskConfig(object):
|
||||
}
|
||||
},
|
||||
"additionalProperties": False,
|
||||
"required": ["title", "scenarios"]
|
||||
"required": ["title", "workloads"]
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -474,12 +471,12 @@ class TaskConfig(object):
|
||||
return [SubTask(s) for s in config["subtasks"]]
|
||||
elif self.version == 1:
|
||||
subtasks = []
|
||||
for name, v1_scenarios in six.iteritems(config):
|
||||
for v1_scenario in v1_scenarios:
|
||||
v2_scenario = copy.deepcopy(v1_scenario)
|
||||
v2_scenario["name"] = name
|
||||
for name, v1_workloads in six.iteritems(config):
|
||||
for v1_workload in v1_workloads:
|
||||
v2_workload = copy.deepcopy(v1_workload)
|
||||
v2_workload["name"] = name
|
||||
subtasks.append(
|
||||
SubTask({"title": name, "scenarios": [v2_scenario]}))
|
||||
SubTask({"title": name, "workloads": [v2_workload]}))
|
||||
return subtasks
|
||||
|
||||
|
||||
@ -496,5 +493,59 @@ class SubTask(object):
|
||||
self.tags = config.get("tags", [])
|
||||
self.group = config.get("group")
|
||||
self.description = config.get("description")
|
||||
self.scenarios = config["scenarios"]
|
||||
self.workloads = [Workload(wconf)
|
||||
for wconf
|
||||
in config["workloads"]]
|
||||
self.context = config.get("context", {})
|
||||
|
||||
|
||||
class Workload(object):
|
||||
"""Workload -- workload configuration in SubTask.
|
||||
|
||||
"""
|
||||
def __init__(self, config):
|
||||
self.name = config["name"]
|
||||
self.runner = config.get("runner", {})
|
||||
self.sla = config.get("sla", {})
|
||||
self.context = config.get("context", {})
|
||||
self.args = config.get("args", {})
|
||||
|
||||
def to_dict(self):
|
||||
workload = {"runner": self.runner}
|
||||
|
||||
for prop in "sla", "args", "context":
|
||||
value = getattr(self, prop)
|
||||
if value:
|
||||
workload[prop] = value
|
||||
|
||||
return workload
|
||||
|
||||
def to_task(self):
|
||||
"""Make task configuration for the workload.
|
||||
|
||||
This method returns a dict representing full configuration
|
||||
of the task containing a single subtask with this single
|
||||
workload.
|
||||
|
||||
:return: dict containing full task configuration
|
||||
"""
|
||||
# NOTE(ikhudoshyn): Result of this method will be used
|
||||
# to store full task configuration in DB so that
|
||||
# subtask configuration in reports would be given
|
||||
# in the same format as it was provided by user.
|
||||
# Temporarily it returns to_dict() in order not
|
||||
# to break existing reports. It should be
|
||||
# properly implemented in a patch that will update reports.
|
||||
# return {self.name: [self.to_dict()]}
|
||||
return self.to_dict()
|
||||
|
||||
def make_key(self, pos):
|
||||
return {"name": self.name,
|
||||
"pos": pos,
|
||||
"kw": self.to_task()}
|
||||
|
||||
def make_exception_args(self, pos, reason):
|
||||
return {"name": self.name,
|
||||
"pos": pos,
|
||||
"config": self.to_dict(),
|
||||
"reason": reason}
|
||||
|
@ -55,7 +55,7 @@ class TaskTestCase(unittest.TestCase):
|
||||
"description": "The first subtask in dummy task",
|
||||
"tags": ["dummy", "functional_test"],
|
||||
"run_in_parallel": False,
|
||||
"scenarios": [{
|
||||
"workloads": [{
|
||||
"name": "Dummy.dummy",
|
||||
"args": {
|
||||
"sleep": 0
|
||||
@ -79,7 +79,7 @@ class TaskTestCase(unittest.TestCase):
|
||||
"description": "The second subtask in dummy task",
|
||||
"tags": ["dummy", "functional_test"],
|
||||
"run_in_parallel": False,
|
||||
"scenarios": [{
|
||||
"workloads": [{
|
||||
"name": "Dummy.dummy",
|
||||
"args": {
|
||||
"sleep": 1
|
||||
|
@ -113,9 +113,9 @@ class TaskEngineTestCase(test.TestCase):
|
||||
|
||||
mock_task_instance = mock.MagicMock()
|
||||
mock_subtask = mock.MagicMock()
|
||||
mock_subtask.scenarios = [
|
||||
{"name": "a"},
|
||||
{"name": "b"}
|
||||
mock_subtask.workloads = [
|
||||
engine.Workload({"name": "a"}),
|
||||
engine.Workload({"name": "b"})
|
||||
]
|
||||
mock_task_instance.subtasks = [mock_subtask]
|
||||
|
||||
@ -134,10 +134,10 @@ class TaskEngineTestCase(test.TestCase):
|
||||
|
||||
mock_task_instance = mock.MagicMock()
|
||||
mock_subtask = mock.MagicMock()
|
||||
mock_subtask.scenarios = [
|
||||
{"name": "exist"},
|
||||
{"name": "nonexist1"},
|
||||
{"name": "nonexist2"}
|
||||
mock_subtask.workloads = [
|
||||
engine.Workload({"name": "exist"}),
|
||||
engine.Workload({"name": "nonexist1"}),
|
||||
engine.Workload({"name": "nonexist2"})
|
||||
]
|
||||
mock_task_instance.subtasks = [mock_subtask]
|
||||
mock_scenario.list_benchmark_scenarios.return_value = ["exist", "aaa"]
|
||||
@ -157,9 +157,9 @@ class TaskEngineTestCase(test.TestCase):
|
||||
):
|
||||
mock_task_instance = mock.MagicMock()
|
||||
mock_subtask = mock.MagicMock()
|
||||
mock_subtask.scenarios = [
|
||||
{"name": "sca", "context": "a"},
|
||||
{"name": "scb", "runner": "b"}
|
||||
mock_subtask.workloads = [
|
||||
engine.Workload({"name": "sca", "context": "a"}),
|
||||
engine.Workload({"name": "sca", "runner": "b"})
|
||||
]
|
||||
mock_task_instance.subtasks = [mock_subtask]
|
||||
eng = engine.TaskEngine(mock.MagicMock(), mock.MagicMock())
|
||||
@ -178,9 +178,9 @@ class TaskEngineTestCase(test.TestCase):
|
||||
mock_scenario_runner, mock_task_config):
|
||||
mock_task_instance = mock.MagicMock()
|
||||
mock_subtask = mock.MagicMock()
|
||||
mock_subtask.scenarios = [
|
||||
{"name": "sca", "context": "a"},
|
||||
{"name": "scb", "runner": "b"}
|
||||
mock_subtask.workloads = [
|
||||
engine.Workload({"name": "sca", "context": "a"}),
|
||||
engine.Workload({"name": "sca", "runner": "b"})
|
||||
]
|
||||
mock_task_instance.subtasks = [mock_subtask]
|
||||
eng = engine.TaskEngine(mock.MagicMock(), mock.MagicMock())
|
||||
@ -198,9 +198,9 @@ class TaskEngineTestCase(test.TestCase):
|
||||
mock_task_config):
|
||||
mock_task_instance = mock.MagicMock()
|
||||
mock_subtask = mock.MagicMock()
|
||||
mock_subtask.scenarios = [
|
||||
{"name": "sca", "context": "a"},
|
||||
{"name": "scb", "runner": "b"}
|
||||
mock_subtask.workloads = [
|
||||
engine.Workload({"name": "sca", "context": "a"}),
|
||||
engine.Workload({"name": "sca", "runner": "b"})
|
||||
]
|
||||
mock_task_instance.subtasks = [mock_subtask]
|
||||
eng = engine.TaskEngine(mock.MagicMock(), mock.MagicMock())
|
||||
@ -216,10 +216,13 @@ class TaskEngineTestCase(test.TestCase):
|
||||
mock_task_config):
|
||||
deployment = mock.MagicMock()
|
||||
eng = engine.TaskEngine(mock.MagicMock(), mock.MagicMock())
|
||||
eng._validate_config_semantic_helper("admin", "user", "name", "pos",
|
||||
deployment, {"args": "args"})
|
||||
workload = engine.Workload(
|
||||
{"name": "name", "runner": "runner", "args": "args"})
|
||||
eng._validate_config_semantic_helper("admin", "user", workload,
|
||||
"pos", deployment)
|
||||
mock_scenario_validate.assert_called_once_with(
|
||||
"name", {"args": "args"}, admin="admin", users=["user"],
|
||||
"name", {"runner": "runner", "args": "args"},
|
||||
admin="admin", users=["user"],
|
||||
deployment=deployment)
|
||||
|
||||
@mock.patch("rally.task.engine.TaskConfig")
|
||||
@ -229,9 +232,10 @@ class TaskEngineTestCase(test.TestCase):
|
||||
self, mock_scenario_validate, mock_task_config):
|
||||
eng = engine.TaskEngine(mock.MagicMock(), mock.MagicMock())
|
||||
|
||||
workload = engine.Workload({"name": "name"})
|
||||
self.assertRaises(exceptions.InvalidTaskConfig,
|
||||
eng._validate_config_semantic_helper, "a", "u", "n",
|
||||
"p", mock.MagicMock(), {})
|
||||
eng._validate_config_semantic_helper, "a", "u",
|
||||
workload, "p", mock.MagicMock())
|
||||
|
||||
@mock.patch("rally.task.engine.TaskConfig")
|
||||
@mock.patch("rally.task.engine.existing_users.ExistingUsers")
|
||||
@ -267,15 +271,13 @@ class TaskEngineTestCase(test.TestCase):
|
||||
|
||||
mock_task_instance = mock.MagicMock()
|
||||
mock_subtask1 = mock.MagicMock()
|
||||
mock_subtask1.scenarios = [
|
||||
{"name": "a", "kw": 0},
|
||||
{"name": "a", "kw": 1}
|
||||
]
|
||||
wconf1 = engine.Workload({"name": "a", "runner": "ra"})
|
||||
wconf2 = engine.Workload({"name": "a", "runner": "rb"})
|
||||
mock_subtask1.workloads = [wconf1, wconf2]
|
||||
|
||||
mock_subtask2 = mock.MagicMock()
|
||||
mock_subtask2.scenarios = [
|
||||
{"name": "b", "kw": 0},
|
||||
]
|
||||
wconf3 = engine.Workload({"name": "b", "runner": "ra"})
|
||||
mock_subtask2.workloads = [wconf3]
|
||||
|
||||
mock_task_instance.subtasks = [mock_subtask1, mock_subtask2]
|
||||
fake_task = mock.MagicMock()
|
||||
@ -296,12 +298,9 @@ class TaskEngineTestCase(test.TestCase):
|
||||
admin = user = mock_clients.return_value
|
||||
fake_deployment = mock_deployment_get.return_value
|
||||
expected_calls = [
|
||||
mock.call(admin, user, "a", 0, fake_deployment,
|
||||
{"name": "a", "kw": 0}),
|
||||
mock.call(admin, user, "a", 1, fake_deployment,
|
||||
{"name": "a", "kw": 1}),
|
||||
mock.call(admin, user, "b", 0, fake_deployment,
|
||||
{"name": "b", "kw": 0})
|
||||
mock.call(admin, user, wconf1, 0, fake_deployment),
|
||||
mock.call(admin, user, wconf2, 1, fake_deployment),
|
||||
mock.call(admin, user, wconf3, 0, fake_deployment)
|
||||
]
|
||||
mock__validate_config_semantic_helper.assert_has_calls(
|
||||
expected_calls, any_order=True)
|
||||
@ -345,9 +344,11 @@ class TaskEngineTestCase(test.TestCase):
|
||||
|
||||
mock_task_instance = mock.MagicMock()
|
||||
mock_subtask = mock.MagicMock()
|
||||
mock_subtask.scenarios = [
|
||||
{"name": "a.task", "context": {"context_a": {"a": 1}}},
|
||||
{"name": "b.task", "context": {"context_b": {"b": 2}}}
|
||||
mock_subtask.workloads = [
|
||||
engine.Workload(
|
||||
{"name": "a.task", "context": {"context_a": {"a": 1}}}),
|
||||
engine.Workload(
|
||||
{"name": "b.task", "context": {"context_b": {"b": 2}}})
|
||||
]
|
||||
mock_task_instance.subtasks = [mock_subtask]
|
||||
|
||||
@ -699,15 +700,15 @@ class TaskTestCase(test.TestCase):
|
||||
mock_sub_task.assert_has_calls([
|
||||
mock.call({
|
||||
"title": "a.task",
|
||||
"scenarios": [{"s": 1, "name": "a.task"}]
|
||||
"workloads": [{"s": 1, "name": "a.task"}]
|
||||
}),
|
||||
mock.call({
|
||||
"title": "a.task",
|
||||
"scenarios": [{"s": 2, "name": "a.task"}]
|
||||
"workloads": [{"s": 2, "name": "a.task"}]
|
||||
}),
|
||||
mock.call({
|
||||
"title": "b.task",
|
||||
"scenarios": [{"s": 3, "name": "b.task"}]
|
||||
"workloads": [{"s": 3, "name": "b.task"}]
|
||||
})
|
||||
], any_order=True)
|
||||
|
||||
@ -724,3 +725,67 @@ class TaskTestCase(test.TestCase):
|
||||
mock_sub_task.assert_has_calls([
|
||||
mock.call(subtask_conf1),
|
||||
mock.call(subtask_conf2)])
|
||||
|
||||
|
||||
class WorkloadTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WorkloadTestCase, self).setUp()
|
||||
|
||||
self.wconf = engine.Workload({
|
||||
"name": "n",
|
||||
"runner": "r",
|
||||
"context": "c",
|
||||
"sla": "s",
|
||||
"args": "a"
|
||||
})
|
||||
|
||||
def test_to_dict(self):
|
||||
expected_dict = {
|
||||
"runner": "r",
|
||||
"context": "c",
|
||||
"sla": "s",
|
||||
"args": "a"
|
||||
}
|
||||
|
||||
self.assertEqual(expected_dict, self.wconf.to_dict())
|
||||
|
||||
def test_to_task(self):
|
||||
expected_dict = {
|
||||
"runner": "r",
|
||||
"context": "c",
|
||||
"sla": "s",
|
||||
"args": "a"
|
||||
}
|
||||
|
||||
self.assertEqual(expected_dict, self.wconf.to_task())
|
||||
|
||||
def test_make_key(self):
|
||||
expected_key = {
|
||||
"name": "n",
|
||||
"pos": "p",
|
||||
"kw": {
|
||||
"runner": "r",
|
||||
"context": "c",
|
||||
"sla": "s",
|
||||
"args": "a"
|
||||
}
|
||||
}
|
||||
|
||||
self.assertEqual(expected_key, self.wconf.make_key("p"))
|
||||
|
||||
def test_make_exception_args(self):
|
||||
expected_args = {
|
||||
"name": "n",
|
||||
"pos": "p",
|
||||
"reason": "r",
|
||||
"config": {
|
||||
"runner": "r",
|
||||
"context": "c",
|
||||
"sla": "s",
|
||||
"args": "a"
|
||||
}
|
||||
}
|
||||
|
||||
self.assertEqual(expected_args,
|
||||
self.wconf.make_exception_args("p", "r"))
|
||||
|
Loading…
Reference in New Issue
Block a user