Asynchronous result yielding for scenario runners

Benchmark engine spawns a consumer thread, which asynchronously handles results
for each scenario run. Current consumer implementation just collects all
results and stores them as 'raw' to save integrity.

Scenario runners have to call _send_result method for every single result
instead of collecting a list of results.

Change-Id: Icbff8b2470af3b3ced6bb55573b5842726177b70
This commit is contained in:
uppi 2014-07-03 15:02:12 +04:00
parent d4a8ed80f8
commit 047b255fdf
11 changed files with 167 additions and 143 deletions

View File

@ -14,6 +14,8 @@
# under the License.
import json
import threading
import time
import traceback
import jsonschema
@ -185,18 +187,20 @@ class BenchmarkEngine(object):
corresponding benchmark test launches
"""
self.task.update_status(consts.TaskStatus.RUNNING)
results = {}
for name in self.config:
for n, kw in enumerate(self.config[name]):
key = {'name': name, 'pos': n, 'kw': kw}
LOG.info("Running benchmark with key: %s" % key)
runner = self._get_runner(kw)
result = runner.run(name, kw.get("context", {}),
kw.get("args", {}))
self.task.append_results(key, {"raw": result})
results[json.dumps(key)] = result
is_done = threading.Event()
consumer = threading.Thread(
target=self.consume_results,
args=(key, self.task, runner.result_queue, is_done))
consumer.start()
runner.run(name, kw.get("context", {}), kw.get("args", {}))
is_done.set()
consumer.join()
self.task.update_status(consts.TaskStatus.FINISHED)
return results
@rutils.log_task_wrapper(LOG.info, _("Check cloud."))
def bind(self, endpoints):
@ -211,3 +215,27 @@ class BenchmarkEngine(object):
clients = osclients.Clients(self.admin_endpoint)
clients.verified_keystone()
return self
@staticmethod
def consume_results(key, task, result_queue, is_done):
"""Consume scenario runner results from queue and send them to db.
Has to be run from different thread simultaneously with the runner.run
method.
:param key: Scenario identifier
:param task: Running task
:param result_queue: Deque with runner results
:param is_done: Event which is set from the runner thread after the
runner finishes it's work.
"""
results = []
while True:
if result_queue:
result = result_queue.popleft()
results.append(result)
elif is_done.isSet():
break
else:
time.sleep(0.1)
task.append_results(key, {"raw": results})

View File

@ -14,6 +14,7 @@
# under the License.
import abc
import collections
import copy
import random
@ -77,56 +78,53 @@ def _run_scenario_once(args):
"atomic_actions": scenario.atomic_actions()}
class ScenarioRunnerResult(list):
class ScenarioRunnerResult(dict):
"""Class for all scenario runners' result."""
RESULT_SCHEMA = {
"type": "array",
"type": "object",
"$schema": rutils.JSON_SCHEMA,
"items": {
"type": "object",
"properties": {
"duration": {
"type": "number"
"properties": {
"duration": {
"type": "number"
},
"idle_duration": {
"type": "number"
},
"scenario_output": {
"type": "object",
"properties": {
"data": {
"type": "object",
"patternProperties": {
".*": {"type": "number"}
}
},
"errors": {
"type": "string"
},
},
"idle_duration": {
"type": "number"
},
"scenario_output": {
"additionalProperties": False
},
"atomic_actions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"data": {
"type": "object",
"patternProperties": {
".*": {"type": "number"}
}
},
"errors": {
"type": "string"
},
"action": {"type": "string"},
"duration": {"type": "number"}
},
"additionalProperties": False
},
"atomic_actions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"action": {"type": "string"},
"duration": {"type": "number"}
},
"additionalProperties": False
}
},
"error": {
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": False
}
"error": {
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": False
}
def __init__(self, result_list):
@ -154,6 +152,7 @@ class ScenarioRunner(object):
# a single admin endpoint here.
self.admin_user = endpoints[0]
self.config = config
self.result_queue = collections.deque()
@staticmethod
def _get_cls(runner_type):
@ -208,13 +207,14 @@ class ScenarioRunner(object):
}
args = cls.preprocess(method_name, context_obj, args)
results = base_ctx.ContextManager.run(context_obj, self._run_scenario,
cls, method_name, args)
base_ctx.ContextManager.run(context_obj, self._run_scenario,
cls, method_name, args)
if not isinstance(results, ScenarioRunnerResult):
name = self.__execution_type__
results_type = type(results)
raise exceptions.InvalidRunnerResult(name=name,
results_type=results_type)
def _send_result(self, result):
"""Send partial result to consumer.
return results
:param result: Result dict to be sent. It should match the
ScenarioRunnerResult schema, otherwise
ValidationError is raised.
"""
self.result_queue.append(ScenarioRunnerResult(result))

View File

@ -82,9 +82,6 @@ class ConstantScenarioRunner(base.ScenarioRunner):
iter_result = pool.imap(base._run_scenario_once,
self._iter_scenario_args(cls, method, context,
args, times))
results = []
for i in range(times):
try:
result = iter_result.next(timeout)
@ -93,13 +90,11 @@ class ConstantScenarioRunner(base.ScenarioRunner):
"scenario_output": {},
"atomic_actions": [],
"error": utils.format_exc(e)}
results.append(result)
self._send_result(result)
pool.close()
pool.join()
return base.ScenarioRunnerResult(results)
class ConstantForDurationScenarioRunner(base.ScenarioRunner):
"""Creates constant load executing a scenario for an interval of time.
@ -158,7 +153,6 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner):
self._iter_scenario_args(cls, method, context, args))
iter_result = pool.imap(base._run_scenario_once, run_args)
results = []
start = time.time()
while True:
try:
@ -168,12 +162,10 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner):
"scenario_output": {},
"atomic_actions": [],
"error": utils.format_exc(e)}
results.append(result)
self._send_result(result)
if time.time() - start > duration:
break
pool.terminate()
pool.join()
return base.ScenarioRunnerResult(results)

View File

@ -85,7 +85,6 @@ class PeriodicScenarioRunner(base.ScenarioRunner):
if i < times - 1:
time.sleep(period)
results = []
for async_result in async_results:
try:
result = async_result.get(timeout=timeout)
@ -94,9 +93,7 @@ class PeriodicScenarioRunner(base.ScenarioRunner):
"scenario_output": {},
"atomic_actions": [],
"error": utils.format_exc(e)}
results.append(result)
self._send_result(result)
for pool in pools:
pool.join()
return base.ScenarioRunnerResult(results)

View File

@ -50,12 +50,8 @@ class SerialScenarioRunner(base.ScenarioRunner):
def _run_scenario(self, cls, method_name, context, args):
times = self.config.get('times', 1)
results = []
for i in range(times):
run_args = (i, cls, method_name,
base._get_scenario_context(context), args)
result = base._run_scenario_once(run_args)
results.append(result)
return base.ScenarioRunnerResult(results)
self._send_result(result)

View File

@ -147,10 +147,11 @@ class ScenarioRunnerResultTestCase(test.TestCase):
}
]
self.assertEqual(config, base.ScenarioRunnerResult(config))
self.assertEqual(config[0], base.ScenarioRunnerResult(config[0]))
self.assertEqual(config[1], base.ScenarioRunnerResult(config[1]))
def test_validate_failed(self):
config = [{"a": 10}]
config = {"a": 10}
self.assertRaises(jsonschema.ValidationError,
base.ScenarioRunnerResult, config)
@ -207,15 +208,15 @@ class ScenarioRunnerTestCase(test.TestCase):
@mock.patch("rally.benchmark.runners.base.osclients")
@mock.patch("rally.benchmark.runners.base.base_ctx.ContextManager")
def test_run(self, mock_ctx_manager, mock_osclients):
runner = constant.ConstantScenarioRunner(mock.MagicMock(),
self.fake_endpoints,
mock.MagicMock())
mock_ctx_manager.run.return_value = base.ScenarioRunnerResult([])
runner = constant.ConstantScenarioRunner(
mock.MagicMock(),
self.fake_endpoints,
mock.MagicMock())
scenario_name = "NovaServers.boot_server_from_volume_and_delete"
config_kwargs = {"image": {"id": 1}, "flavor": {"id": 1}}
result = runner.run(scenario_name, {"some_ctx": 2}, config_kwargs)
runner.run(scenario_name, {"some_ctx": 2}, config_kwargs)
self.assertEqual(result, mock_ctx_manager.run.return_value)
self.assertEqual(list(runner.result_queue), [])
cls_name, method_name = scenario_name.split(".", 1)
cls = base_scenario.Scenario.get_by_name(cls_name)
@ -233,12 +234,11 @@ class ScenarioRunnerTestCase(test.TestCase):
method_name, config_kwargs]
mock_ctx_manager.run.assert_called_once_with(*expected)
@mock.patch("rally.benchmark.runners.base.base_ctx.ContextManager")
def test_run_scenario_runner_results_exception(self, mock_ctx_manager):
srunner_cls = constant.ConstantForDurationScenarioRunner
srunner = srunner_cls(mock.MagicMock(), self.fake_endpoints,
mock.MagicMock())
self.assertRaises(exceptions.InvalidRunnerResult,
srunner.run,
"NovaServers.boot_server_from_volume_and_delete",
mock.MagicMock(), {})
def test_runner_send_result_exception(self):
runner = constant.ConstantScenarioRunner(
mock.MagicMock(),
self.fake_endpoints,
mock.MagicMock())
self.assertRaises(
jsonschema.ValidationError,
lambda: runner._send_result(mock.MagicMock()))

View File

@ -49,31 +49,33 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
runner = constant.ConstantScenarioRunner(
None, [self.context["admin"]["endpoint"]], self.config)
result = runner._run_scenario(fakes.FakeScenario, "do_it",
self.context, self.args)
self.assertEqual(len(result), self.config["times"])
self.assertIsNotNone(base.ScenarioRunnerResult(result))
runner._run_scenario(fakes.FakeScenario,
"do_it", self.context, self.args)
self.assertEqual(len(runner.result_queue), self.config["times"])
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
def test_run_scenario_constantly_for_times_exception(self):
runner = constant.ConstantScenarioRunner(
None, [self.context["admin"]["endpoint"]], self.config)
result = runner._run_scenario(fakes.FakeScenario,
"something_went_wrong",
self.context, self.args)
self.assertEqual(len(result), self.config["times"])
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', result[0])
runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", self.context, self.args)
self.assertEqual(len(runner.result_queue), self.config["times"])
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', runner.result_queue[0])
def test_run_scenario_constantly_for_times_timeout(self):
runner = constant.ConstantScenarioRunner(
None, [self.context["admin"]["endpoint"]], self.config)
result = runner._run_scenario(fakes.FakeScenario,
"raise_timeout", self.context, self.args)
self.assertEqual(len(result), self.config["times"])
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', result[0])
runner._run_scenario(fakes.FakeScenario,
"raise_timeout", self.context, self.args)
self.assertEqual(len(runner.result_queue), self.config["times"])
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', runner.result_queue[0])
class ConstantForDurationScenarioRunnerTeestCase(test.TestCase):
@ -103,34 +105,36 @@ class ConstantForDurationScenarioRunnerTeestCase(test.TestCase):
runner = constant.ConstantForDurationScenarioRunner(
None, [self.context["admin"]["endpoint"]], self.config)
result = runner._run_scenario(fakes.FakeScenario, "do_it",
self.context, self.args)
runner._run_scenario(fakes.FakeScenario, "do_it",
self.context, self.args)
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
self.assertEqual(len(result), expected_times)
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertEqual(len(runner.result_queue), expected_times)
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
def test_run_scenario_constantly_for_duration_exception(self):
runner = constant.ConstantForDurationScenarioRunner(
None, [self.context["admin"]["endpoint"]], self.config)
result = runner._run_scenario(fakes.FakeScenario,
"something_went_wrong",
self.context, self.args)
runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", self.context, self.args)
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
self.assertEqual(len(result), expected_times)
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', result[0])
self.assertEqual(len(runner.result_queue), expected_times)
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', runner.result_queue[0])
def test_run_scenario_constantly_for_duration_timeout(self):
runner = constant.ConstantForDurationScenarioRunner(
None, [self.context["admin"]["endpoint"]], self.config)
None, [self.context["admin"]["endpoint"]], self.config)
result = runner._run_scenario(fakes.FakeScenario,
"raise_timeout", self.context, self.args)
runner._run_scenario(fakes.FakeScenario,
"raise_timeout", self.context, self.args)
# NOTE(mmorais): when duration is 0, scenario executes exactly 1 time
expected_times = 1
self.assertEqual(len(result), expected_times)
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', result[0])
self.assertEqual(len(runner.result_queue), expected_times)
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn('error', runner.result_queue[0])

View File

@ -54,9 +54,10 @@ class PeriodicScenarioRunnerTestCase(test.TestCase):
runner = periodic.PeriodicScenarioRunner(
None, [context["admin"]["endpoint"]], config)
result = runner._run_scenario(fakes.FakeScenario, "do_it", context, {})
self.assertEqual(len(result), config["times"])
self.assertIsNotNone(base.ScenarioRunnerResult(result))
runner._run_scenario(fakes.FakeScenario, "do_it", context, {})
self.assertEqual(len(runner.result_queue), config["times"])
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
def test_run_scenario_exception(self):
context = fakes.FakeUserContext({}).context
@ -66,10 +67,11 @@ class PeriodicScenarioRunnerTestCase(test.TestCase):
runner = periodic.PeriodicScenarioRunner(
None, [context["admin"]["endpoint"]], config)
result = runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", context, {})
self.assertEqual(len(result), config["times"])
self.assertIsNotNone(base.ScenarioRunnerResult(result))
runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", context, {})
self.assertEqual(len(runner.result_queue), config["times"])
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
@mock.patch("rally.benchmark.runners.periodic.base.ScenarioRunnerResult")
@mock.patch("rally.benchmark.runners.periodic.multiprocessing")

View File

@ -41,7 +41,8 @@ class SerialScenarioRunnerTestCase(test.TestCase):
runner = serial.SerialScenarioRunner(mock.MagicMock(),
self.fake_endpoints,
{"times": times})
results = runner._run_scenario(fakes.FakeScenario, "do_it",
fakes.FakeUserContext({}).context, {})
self.assertEqual(mock_run_once.call_count, times)
runner._run_scenario(fakes.FakeScenario, "do_it",
fakes.FakeUserContext({}).context, {})
self.assertEqual(len(runner.result_queue), times)
results = list(runner.result_queue)
self.assertEqual(results, expected_results)

View File

@ -220,21 +220,22 @@ class BenchmarkEngineTestCase(test.TestCase):
]
mock_helper.assert_has_calls(expected_calls)
def test_run__update_status(self):
@mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results")
def test_run__update_status(self, mock_consume):
task = mock.MagicMock()
eng = engine.BenchmarkEngine([], task)
results = eng.run()
self.assertEqual(results, {})
eng.run()
task.update_status.assert_has_calls([
mock.call(consts.TaskStatus.RUNNING),
mock.call(consts.TaskStatus.FINISHED)
])
@mock.patch("rally.benchmark.engine.endpoint.Endpoint")
@mock.patch("rally.benchmark.engine.osclients")
@mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results")
@mock.patch("rally.benchmark.engine.base_runner.ScenarioRunner")
@mock.patch("rally.benchmark.engine.osclients")
@mock.patch("rally.benchmark.engine.endpoint.Endpoint")
def test_run__config_has_args(self, mock_endpoint, mock_osclients,
mock_runner):
mock_runner, mock_consume):
config = {
"a.args": [{"args": {"a": "a", "b": 1}}],
"b.args": [{"args": {"a": 1}}]
@ -243,11 +244,12 @@ class BenchmarkEngineTestCase(test.TestCase):
eng = engine.BenchmarkEngine(config, task).bind([{}])
eng.run()
@mock.patch("rally.benchmark.engine.endpoint.Endpoint")
@mock.patch("rally.benchmark.engine.osclients")
@mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results")
@mock.patch("rally.benchmark.engine.base_runner.ScenarioRunner")
@mock.patch("rally.benchmark.engine.osclients")
@mock.patch("rally.benchmark.engine.endpoint.Endpoint")
def test_run__config_has_runner(self, mock_endpoint, mock_osclients,
mock_runner):
mock_runner, mock_consume):
config = {
"a.args": [{"runner": {"type": "a", "b": 1}}],
"b.args": [{"runner": {"a": 1}}]
@ -256,11 +258,12 @@ class BenchmarkEngineTestCase(test.TestCase):
eng = engine.BenchmarkEngine(config, task).bind([{}])
eng.run()
@mock.patch("rally.benchmark.engine.endpoint.Endpoint")
@mock.patch("rally.benchmark.engine.osclients")
@mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results")
@mock.patch("rally.benchmark.engine.base_runner.ScenarioRunner")
@mock.patch("rally.benchmark.engine.osclients")
@mock.patch("rally.benchmark.engine.endpoint.Endpoint")
def test_run__config_has_context(self, mock_endpoint, mock_osclients,
mock_runner):
mock_runner, mock_consume):
config = {
"a.args": [{"context": {"context_a": {"a": 1}}}],
"b.args": [{"context": {"context_b": {"b": 2}}}]

View File

@ -15,6 +15,7 @@
""" Test for orchestrator. """
import collections
import uuid
import mock
@ -123,7 +124,7 @@ class APITestCase(test.TestCase):
mock_deploy_get.return_value = self.deployment
mock_utils_runner.return_value = mock_runner = mock.Mock()
mock_runner.run.return_value = ['fake_result']
mock_runner.result_queue = collections.deque(['fake_result'])
mock_osclients.Clients.return_value = fakes.FakeClients()