From 1587efbcc1657e9fdb29b5c128284e82d19433bb Mon Sep 17 00:00:00 2001 From: Anton Studenov Date: Wed, 28 Dec 2016 21:07:24 +0300 Subject: [PATCH] Save raw task results in chunks * Split task results into chunks with size configured by raw_result_chunk_size parameter * Removed load_finished event as not used anywhere Change-Id: I09841d02c60ab66eebd851bdf1c0a6d5f9e5a7be --- etc/rally/rally.conf.sample | 12 ++-- rally-jobs/rally.yaml | 11 +++ rally/common/db/api.py | 6 +- rally/common/db/sqlalchemy/api.py | 32 +++++---- rally/common/objects/task.py | 5 +- rally/common/opts.py | 4 +- rally/task/engine.py | 33 +++++++-- tests/unit/common/db/test_api.py | 94 +++++++++++++++++++++++--- tests/unit/common/objects/test_task.py | 5 +- tests/unit/task/test_engine.py | 58 +++++++++++++++- 10 files changed, 220 insertions(+), 40 deletions(-) diff --git a/etc/rally/rally.conf.sample b/etc/rally/rally.conf.sample index c2118c6b4e..300eb5fe5b 100644 --- a/etc/rally/rally.conf.sample +++ b/etc/rally/rally.conf.sample @@ -111,6 +111,10 @@ # value) #openstack_client_http_timeout = 180.0 +# Size of raw result chunk in iterations (integer value) +# Minimum value: 1 +#raw_result_chunk_size = 1000 + [benchmark] @@ -242,12 +246,12 @@ # the status. (floating point value) #magnum_cluster_create_prepoll_delay = 5.0 -# Time(in sec) to wait for magnum cluster to be created. (floating point -# value) +# Time(in sec) to wait for magnum cluster to be created. (floating +# point value) #magnum_cluster_create_timeout = 1200.0 -# Time interval(in sec) between checks when waiting for cluster creation. -# (floating point value) +# Time interval(in sec) between checks when waiting for cluster +# creation. (floating point value) #magnum_cluster_create_poll_interval = 1.0 # Delay between creating Manila share and polling for its status. diff --git a/rally-jobs/rally.yaml b/rally-jobs/rally.yaml index e59e8bf1d9..b6c36819ea 100644 --- a/rally-jobs/rally.yaml +++ b/rally-jobs/rally.yaml @@ -556,6 +556,17 @@ failure_rate: max: 0 + - + args: + sleep: 0 + runner: + type: "constant" + times: 4500 + concurrency: 20 + sla: + failure_rate: + max: 0 + Dummy.dummy_exception: - args: diff --git a/rally/common/db/api.py b/rally/common/db/api.py index 6cceccfd96..89a2caa6ee 100644 --- a/rally/common/db/api.py +++ b/rally/common/db/api.py @@ -257,15 +257,17 @@ def workload_create(task_uuid, subtask_uuid, key): return get_impl().workload_create(task_uuid, subtask_uuid, key) -def workload_data_create(task_uuid, workload_uuid, data): +def workload_data_create(task_uuid, workload_uuid, chunk_order, data): """Create a workload data. :param task_uuid: string with UUID of Task instance. :param workload_uuid: string with UUID of Workload instance. + :param chunk_order: ordinal index of workload data :param data: dict with record values on the workload data. :returns: a dict with data on the workload data. """ - return get_impl().workload_data_create(task_uuid, workload_uuid, data) + return get_impl().workload_data_create(task_uuid, workload_uuid, + chunk_order, data) def workload_set_results(workload_uuid, data): diff --git a/rally/common/db/sqlalchemy/api.py b/rally/common/db/sqlalchemy/api.py index 4944771f48..a2ed5644ae 100644 --- a/rally/common/db/sqlalchemy/api.py +++ b/rally/common/db/sqlalchemy/api.py @@ -242,7 +242,10 @@ class Connection(object): "verification_log": json.dumps(task.validation_result) } - def _make_old_task_result(self, workload, workload_data): + def _make_old_task_result(self, workload, workload_data_list): + raw_data = [data + for workload_data in workload_data_list + for data in workload_data.chunk_data["raw"]] return { "id": workload.id, "task_uuid": workload.task_uuid, @@ -260,7 +263,7 @@ class Connection(object): } }, "data": { - "raw": workload_data.chunk_data["raw"], + "raw": raw_data, "load_duration": workload.load_duration, "full_duration": workload.full_duration, "sla": workload.sla_results["sla"], @@ -268,6 +271,11 @@ class Connection(object): } } + def _task_workload_data_get_all(self, workload_uuid): + return (self.model_query(models.WorkloadData). + filter_by(workload_uuid=workload_uuid). + order_by(models.WorkloadData.chunk_order.asc())) + # @db_api.serialize def task_get(self, uuid): task = self._task_get(uuid) @@ -407,13 +415,11 @@ class Connection(object): filter_by(task_uuid=uuid).all()) for workload in workloads: - workload_data = (self.model_query(models.WorkloadData). - filter_by(task_uuid=uuid, - workload_uuid=workload.uuid). - first()) + workload_data_list = self._task_workload_data_get_all( + workload.uuid) results.append( - self._make_old_task_result(workload, workload_data)) + self._make_old_task_result(workload, workload_data_list)) return results @@ -451,7 +457,8 @@ class Connection(object): return workload @db_api.serialize - def workload_data_create(self, task_uuid, workload_uuid, data): + def workload_data_create(self, task_uuid, workload_uuid, chunk_order, + data): workload_data = models.WorkloadData(task_uuid=task_uuid, workload_uuid=workload_uuid) @@ -485,7 +492,7 @@ class Connection(object): workload_data.update({ "task_uuid": task_uuid, "workload_uuid": workload_uuid, - "chunk_order": 0, + "chunk_order": chunk_order, "iteration_count": iter_count, "failed_iteration_count": failed_iter_count, "chunk_data": {"raw": raw_data}, @@ -503,10 +510,11 @@ class Connection(object): workload = self.model_query(models.Workload).filter_by( uuid=workload_uuid).first() - workload_data = self.model_query(models.WorkloadData).filter_by( - workload_uuid=workload_uuid).first() + workload_data_list = self._task_workload_data_get_all(workload.uuid) - raw_data = workload_data.chunk_data.get("raw", []) + raw_data = [raw + for workload_data in workload_data_list + for raw in workload_data.chunk_data["raw"]] iter_count = len(raw_data) failed_iter_count = 0 diff --git a/rally/common/objects/task.py b/rally/common/objects/task.py index ccbab454e7..1ecedab120 100644 --- a/rally/common/objects/task.py +++ b/rally/common/objects/task.py @@ -569,9 +569,10 @@ class Workload(object): def __getitem__(self, key): return self.workload[key] - def add_workload_data(self, workload_data): + def add_workload_data(self, chunk_order, workload_data): db.workload_data_create(self.workload["task_uuid"], - self.workload["uuid"], workload_data) + self.workload["uuid"], chunk_order, + workload_data) def set_results(self, data): db.workload_set_results(self.workload["uuid"], data) diff --git a/rally/common/opts.py b/rally/common/opts.py index 036bb68287..10a5bd8f42 100644 --- a/rally/common/opts.py +++ b/rally/common/opts.py @@ -34,13 +34,15 @@ from rally.plugins.openstack.scenarios.vm import utils as vm_utils from rally.plugins.openstack.scenarios.watcher import utils as watcher_utils from rally.plugins.openstack.verification.tempest import config as tempest_conf from rally.plugins.openstack.wrappers import glance as glance_utils +from rally.task import engine def list_opts(): return [ ("DEFAULT", itertools.chain(logging.DEBUG_OPTS, - osclients.OSCLIENTS_OPTS)), + osclients.OSCLIENTS_OPTS, + engine.TASK_ENGINE_OPTS)), ("benchmark", itertools.chain(cinder_utils.CINDER_BENCHMARK_OPTS, ec2_utils.EC2_BENCHMARK_OPTS, diff --git a/rally/task/engine.py b/rally/task/engine.py index b36f3cd06b..5cf41b9905 100644 --- a/rally/task/engine.py +++ b/rally/task/engine.py @@ -20,6 +20,7 @@ import time import traceback import jsonschema +from oslo_config import cfg import six from rally.common.i18n import _ @@ -40,6 +41,14 @@ from rally.task import sla LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +TASK_ENGINE_OPTS = [ + cfg.IntOpt("raw_result_chunk_size", default=1000, min=1, + help="Size of raw result chunk in iterations"), +] +CONF.register_opts(TASK_ENGINE_OPTS) + class ResultConsumer(object): """ResultConsumer class stores results from ScenarioRunner, checks SLA. @@ -69,6 +78,7 @@ class ResultConsumer(object): self.runner = runner self.load_started_at = float("inf") self.load_finished_at = 0 + self.workload_data_count = 0 self.sla_checker = sla.SLAChecker(key["kw"]) self.hook_executor = hook.HookExecutor(key["kw"], self.task) @@ -109,6 +119,17 @@ class ResultConsumer(object): self.task.update_status( consts.TaskStatus.SOFT_ABORTING) task_aborted = True + + # save results chunks + chunk_size = CONF.raw_result_chunk_size + while len(self.results) >= chunk_size: + results_chunk = self.results[:chunk_size] + self.results = self.results[chunk_size:] + results_chunk.sort(key=lambda x: x["timestamp"]) + self.workload.add_workload_data(self.workload_data_count, + {"raw": results_chunk}) + self.workload_data_count += 1 + elif self.is_done.isSet(): break else: @@ -136,9 +157,6 @@ class ResultConsumer(object): self.task["uuid"]) == consts.TaskStatus.ABORTED: self.sla_checker.set_aborted_manually() - # NOTE(boris-42): Sort in order of starting instead of order of ending - self.results.sort(key=lambda x: x["timestamp"]) - load_duration = max(self.load_finished_at - self.load_started_at, 0) LOG.info("Load duration is: %s" % utils.format_float_to_str( @@ -153,12 +171,17 @@ class ResultConsumer(object): "full_duration": self.finish - self.start, "sla": self.sla_checker.results(), } - self.runner.send_event(type="load_finished", value=results) if "hooks" in self.key["kw"]: self.event_thread.join() results["hooks"] = self.hook_executor.results() - self.workload.add_workload_data({"raw": self.results}) + if self.results: + # NOTE(boris-42): Sort in order of starting + # instead of order of ending + self.results.sort(key=lambda x: x["timestamp"]) + self.workload.add_workload_data(self.workload_data_count, + {"raw": self.results}) + self.workload.set_results(results) @staticmethod diff --git a/tests/unit/common/db/test_api.py b/tests/unit/common/db/test_api.py index 10fa62124f..a909e436e9 100644 --- a/tests/unit/common/db/test_api.py +++ b/tests/unit/common/db/test_api.py @@ -254,7 +254,7 @@ class TasksTestCase(test.DBTestCase): } subtask = db.subtask_create(task_id, title="foo") workload = db.workload_create(task_id, subtask["uuid"], key) - db.workload_data_create(task_id, workload["uuid"], {"raw": []}) + db.workload_data_create(task_id, workload["uuid"], 0, {"raw": []}) db.workload_set_results(workload["uuid"], data) res = db.task_result_get_all_by_uuid(task_id) @@ -311,7 +311,7 @@ class TasksTestCase(test.DBTestCase): data["sla"][0] = {"success": True} subtask = db.subtask_create(task_id, title="foo") workload = db.workload_create(task_id, subtask["uuid"], key) - db.workload_data_create(task_id, workload["uuid"], {"raw": []}) + db.workload_data_create(task_id, workload["uuid"], 0, {"raw": []}) db.workload_set_results(workload["uuid"], data) for task_id in (task1, task2): @@ -355,7 +355,8 @@ class TasksTestCase(test.DBTestCase): subtask = db.subtask_create(task1["uuid"], title="foo") workload = db.workload_create(task1["uuid"], subtask["uuid"], key) - db.workload_data_create(task1["uuid"], workload["uuid"], {"raw": []}) + db.workload_data_create( + task1["uuid"], workload["uuid"], 0, {"raw": []}) db.workload_set_results(workload["uuid"], data) task1_full = db.task_get_detailed(task1["uuid"]) @@ -403,7 +404,8 @@ class TasksTestCase(test.DBTestCase): subtask = db.subtask_create(task1["uuid"], title="foo") workload = db.workload_create(task1["uuid"], subtask["uuid"], key) - db.workload_data_create(task1["uuid"], workload["uuid"], {"raw": []}) + db.workload_data_create( + task1["uuid"], workload["uuid"], 0, {"raw": []}) db.workload_set_results(workload["uuid"], data) task1_full = db.task_get_detailed_last() @@ -463,7 +465,7 @@ class TasksTestCase(test.DBTestCase): subtask = db.subtask_create(task_id, title="foo") workload = db.workload_create(task_id, subtask["uuid"], key) - db.workload_data_create(task_id, workload["uuid"], raw_data) + db.workload_data_create(task_id, workload["uuid"], 0, raw_data) db.workload_set_results(workload["uuid"], data) res = db.task_result_get_all_by_uuid(task_id) @@ -471,6 +473,80 @@ class TasksTestCase(test.DBTestCase): self.assertEqual(raw_data["raw"], res[0]["data"]["raw"]) self.assertEqual(key, res[0]["key"]) + def test_task_multiple_raw_result_create(self): + task_id = self._create_task()["uuid"] + key = { + "name": "atata", + "pos": 0, + "kw": { + "args": {"a": "A"}, + "context": {"c": "C"}, + "sla": {"s": "S"}, + "runner": {"r": "R", "type": "T"}, + "hooks": [], + } + } + + subtask = db.subtask_create(task_id, title="foo") + workload = db.workload_create(task_id, subtask["uuid"], key) + + db.workload_data_create(task_id, workload["uuid"], 0, { + "raw": [ + {"error": "anError", "timestamp": 10, "duration": 1}, + {"duration": 1, "timestamp": 10, "duration": 1}, + {"duration": 2, "timestamp": 10, "duration": 1}, + {"duration": 3, "timestamp": 10, "duration": 1}, + ], + }) + + db.workload_data_create(task_id, workload["uuid"], 1, { + "raw": [ + {"error": "anError2", "timestamp": 10, "duration": 1}, + {"duration": 6, "timestamp": 10, "duration": 1}, + {"duration": 5, "timestamp": 10, "duration": 1}, + {"duration": 4, "timestamp": 10, "duration": 1}, + ], + }) + + db.workload_data_create(task_id, workload["uuid"], 2, { + "raw": [ + {"duration": 7, "timestamp": 10, "duration": 1}, + {"duration": 8, "timestamp": 10, "duration": 1}, + ], + }) + + db.workload_set_results(workload["uuid"], { + "sla": [{"success": True}], + "load_duration": 13, + "full_duration": 42 + }) + + res = db.task_result_get_all_by_uuid(task_id) + self.assertEqual(len(res), 1) + self.assertEqual(res[0]["key"], key) + self.assertEqual(res[0]["data"], { + "raw": [ + {"error": "anError", "timestamp": 10, "duration": 1}, + {"duration": 1, "timestamp": 10, "duration": 1}, + {"duration": 2, "timestamp": 10, "duration": 1}, + {"duration": 3, "timestamp": 10, "duration": 1}, + {"error": "anError2", "timestamp": 10, "duration": 1}, + {"duration": 6, "timestamp": 10, "duration": 1}, + {"duration": 5, "timestamp": 10, "duration": 1}, + {"duration": 4, "timestamp": 10, "duration": 1}, + {"duration": 7, "timestamp": 10, "duration": 1}, + {"duration": 8, "timestamp": 10, "duration": 1}, + ], + "sla": [{"success": True}], + "hooks": [], + "load_duration": 13, + "full_duration": 42 + }) + + db.task_delete(task_id) + res = db.task_result_get_all_by_uuid(task_id) + self.assertEqual(len(res), 0) + class SubtaskTestCase(test.DBTestCase): def setUp(self): @@ -544,7 +620,7 @@ class WorkloadTestCase(test.DBTestCase): } workload = db.workload_create(self.task_uuid, self.subtask_uuid, key) - db.workload_data_create(self.task_uuid, workload["uuid"], raw_data) + db.workload_data_create(self.task_uuid, workload["uuid"], 0, raw_data) workload = db.workload_set_results(workload["uuid"], data) self.assertEqual("atata", workload["name"]) self.assertEqual(0, workload["position"]) @@ -576,7 +652,6 @@ class WorkloadTestCase(test.DBTestCase): "runner": {"r": "R", "type": "T"} } } - raw_data = {"raw": []} data = { "sla": [ {"s": "S", "success": False}, @@ -588,7 +663,6 @@ class WorkloadTestCase(test.DBTestCase): } workload = db.workload_create(self.task_uuid, self.subtask_uuid, key) - db.workload_data_create(self.task_uuid, workload["uuid"], raw_data) workload = db.workload_set_results(workload["uuid"], data) self.assertEqual("atata", workload["name"]) self.assertEqual(0, workload["position"]) @@ -633,7 +707,7 @@ class WorkloadDataTestCase(test.DBTestCase): ] } workload_data = db.workload_data_create(self.task_uuid, - self.workload_uuid, data) + self.workload_uuid, 0, data) self.assertEqual(3, workload_data["iteration_count"]) self.assertEqual(1, workload_data["failed_iteration_count"]) self.assertEqual(dt.datetime.fromtimestamp(1), @@ -649,7 +723,7 @@ class WorkloadDataTestCase(test.DBTestCase): mock_time.return_value = 10 data = {"raw": []} workload_data = db.workload_data_create(self.task_uuid, - self.workload_uuid, data) + self.workload_uuid, 0, data) self.assertEqual(0, workload_data["iteration_count"]) self.assertEqual(0, workload_data["failed_iteration_count"]) self.assertEqual(dt.datetime.fromtimestamp(10), diff --git a/tests/unit/common/objects/test_task.py b/tests/unit/common/objects/test_task.py index 4f91702674..aef8cd2801 100644 --- a/tests/unit/common/objects/test_task.py +++ b/tests/unit/common/objects/test_task.py @@ -385,9 +385,10 @@ class WorkloadTestCase(test.TestCase): mock_workload_create.return_value = self.workload workload = objects.Workload("uuid1", "uuid2", {"bar": "baz"}) - workload = workload.add_workload_data({"data": "foo"}) + workload = workload.add_workload_data(0, {"data": "foo"}) mock_workload_data_create.assert_called_once_with( - self.workload["task_uuid"], self.workload["uuid"], {"data": "foo"}) + self.workload["task_uuid"], self.workload["uuid"], + 0, {"data": "foo"}) @mock.patch("rally.common.objects.task.db.workload_set_results") @mock.patch("rally.common.objects.task.db.workload_create") diff --git a/tests/unit/task/test_engine.py b/tests/unit/task/test_engine.py index b0a6989d1c..004f835315 100644 --- a/tests/unit/task/test_engine.py +++ b/tests/unit/task/test_engine.py @@ -550,7 +550,7 @@ class ResultConsumerTestCase(test.TestCase): key, task, subtask, workload, runner, False): pass - workload.add_workload_data.assert_called_once_with({"raw": []}) + self.assertFalse(workload.add_workload_data.called) workload.set_results.assert_called_once_with({ "full_duration": 1, "sla": mock_sla_results, @@ -671,6 +671,60 @@ class ResultConsumerTestCase(test.TestCase): mock_sla_instance.set_unexpected_failure.assert_has_calls( [mock.call(exc)]) + @mock.patch("rally.task.engine.CONF") + @mock.patch("rally.common.objects.Task.get_status") + @mock.patch("rally.task.engine.ResultConsumer.wait_and_abort") + @mock.patch("rally.task.sla.SLAChecker") + def test_consume_results_chunked( + self, mock_sla_checker, mock_result_consumer_wait_and_abort, + mock_task_get_status, mock_conf): + mock_conf.raw_result_chunk_size = 2 + mock_sla_instance = mock.MagicMock() + mock_sla_checker.return_value = mock_sla_instance + mock_task_get_status.return_value = consts.TaskStatus.RUNNING + key = {"kw": {"fake": 2}, "name": "fake", "pos": 0} + task = mock.MagicMock(spec=objects.Task) + subtask = mock.Mock(spec=objects.Subtask) + workload = mock.Mock(spec=objects.Workload) + runner = mock.MagicMock() + + results = [ + [{"duration": 1, "timestamp": 3}, + {"duration": 2, "timestamp": 2}, + {"duration": 3, "timestamp": 3}], + [{"duration": 4, "timestamp": 2}, + {"duration": 5, "timestamp": 3}], + [{"duration": 6, "timestamp": 2}], + [{"duration": 7, "timestamp": 1}], + ] + + runner.result_queue = collections.deque(results) + runner.event_queue = collections.deque() + with engine.ResultConsumer( + key, task, subtask, workload, runner, False) as consumer_obj: + pass + + mock_sla_instance.add_iteration.assert_has_calls([ + mock.call({"duration": 1, "timestamp": 3}), + mock.call({"duration": 2, "timestamp": 2}), + mock.call({"duration": 3, "timestamp": 3}), + mock.call({"duration": 4, "timestamp": 2}), + mock.call({"duration": 5, "timestamp": 3}), + mock.call({"duration": 6, "timestamp": 2}), + mock.call({"duration": 7, "timestamp": 1})]) + + self.assertEqual([{"duration": 7, "timestamp": 1}], + consumer_obj.results) + + workload.add_workload_data.assert_has_calls([ + mock.call(0, {"raw": [{"duration": 2, "timestamp": 2}, + {"duration": 1, "timestamp": 3}]}), + mock.call(1, {"raw": [{"duration": 4, "timestamp": 2}, + {"duration": 3, "timestamp": 3}]}), + mock.call(2, {"raw": [{"duration": 6, "timestamp": 2}, + {"duration": 5, "timestamp": 3}]}), + mock.call(3, {"raw": [{"duration": 7, "timestamp": 1}]})]) + @mock.patch("rally.task.engine.LOG") @mock.patch("rally.task.hook.HookExecutor") @mock.patch("rally.task.engine.time.time") @@ -719,7 +773,7 @@ class ResultConsumerTestCase(test.TestCase): mock.call(event_type="iteration", value=3) ]) - workload.add_workload_data.assert_called_once_with({"raw": []}) + self.assertFalse(workload.add_workload_data.called) workload.set_results.assert_called_once_with({ "full_duration": 1, "sla": mock_sla_results,