Implement abort in scenario runners
* Implement abort() in all the scenario classes * Add a Dummy scenario for serial & constant_for_duration runners to the gate job Change-Id: I526d594ea9a185fc7d6e3add0f119b529b4a070a
This commit is contained in:
parent
717de8291e
commit
4bb12807c9
@ -230,6 +230,17 @@
|
||||
failure_rate:
|
||||
max: 0
|
||||
|
||||
-
|
||||
args:
|
||||
sleep: 0.1
|
||||
runner:
|
||||
type: "constant_for_duration"
|
||||
duration: 5
|
||||
concurrency: 5
|
||||
sla:
|
||||
failure_rate:
|
||||
max: 0
|
||||
|
||||
-
|
||||
args:
|
||||
sleep: 0.001
|
||||
@ -241,6 +252,16 @@
|
||||
failure_rate:
|
||||
max: 0
|
||||
|
||||
-
|
||||
args:
|
||||
sleep: 0.1
|
||||
runner:
|
||||
type: "serial"
|
||||
times: 20
|
||||
sla:
|
||||
failure_rate:
|
||||
max: 0
|
||||
|
||||
-
|
||||
args:
|
||||
sleep: 0.01
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import abc
|
||||
import collections
|
||||
import multiprocessing
|
||||
import random
|
||||
|
||||
import jsonschema
|
||||
@ -169,6 +170,7 @@ class ScenarioRunner(object):
|
||||
self.task = task
|
||||
self.config = config
|
||||
self.result_queue = collections.deque()
|
||||
self.aborted = multiprocessing.Event()
|
||||
|
||||
@staticmethod
|
||||
def _get_cls(runner_type):
|
||||
@ -212,6 +214,8 @@ class ScenarioRunner(object):
|
||||
cls_name, method_name = name.split(".", 1)
|
||||
cls = scenario_base.Scenario.get_by_name(cls_name)
|
||||
|
||||
self.aborted.clear()
|
||||
|
||||
# NOTE(boris-42): processing @types decorators
|
||||
args = types.preprocess(cls, method_name, context, args)
|
||||
|
||||
@ -219,6 +223,10 @@ class ScenarioRunner(object):
|
||||
self._run_scenario(cls, method_name, context, args)
|
||||
return timer.duration()
|
||||
|
||||
def abort(self):
|
||||
"""Abort the execution of further benchmark scenario iterations."""
|
||||
self.aborted.set()
|
||||
|
||||
def _send_result(self, result):
|
||||
"""Send partial result to consumer.
|
||||
|
||||
|
@ -68,12 +68,13 @@ class ConstantScenarioRunner(base.ScenarioRunner):
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _iter_scenario_args(cls, method, ctx, args, times):
|
||||
def _iter_scenario_args(cls, method, ctx, args, times, aborted):
|
||||
for i in moves.range(times):
|
||||
if aborted.is_set():
|
||||
break
|
||||
yield (i, cls, method, base._get_scenario_context(ctx), args)
|
||||
|
||||
def _run_scenario(self, cls, method, context, args):
|
||||
|
||||
timeout = self.config.get("timeout", 600)
|
||||
concurrency = self.config.get("concurrency", 1)
|
||||
# NOTE(msdubov): If not specified, perform single scenario run.
|
||||
@ -82,12 +83,15 @@ class ConstantScenarioRunner(base.ScenarioRunner):
|
||||
pool = multiprocessing.Pool(concurrency)
|
||||
iter_result = pool.imap(base._run_scenario_once,
|
||||
self._iter_scenario_args(cls, method, context,
|
||||
args, times))
|
||||
for i in range(times):
|
||||
args, times,
|
||||
self.aborted))
|
||||
while True:
|
||||
try:
|
||||
result = iter_result.next(timeout)
|
||||
except multiprocessing.TimeoutError as e:
|
||||
result = base.format_result_on_timeout(e, timeout)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
self._send_result(result)
|
||||
|
||||
@ -135,13 +139,14 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner):
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _iter_scenario_args(cls, method, ctx, args):
|
||||
def _iter_scenario_args(cls, method, ctx, args, aborted):
|
||||
def _scenario_args(i):
|
||||
if aborted.is_set():
|
||||
raise StopIteration()
|
||||
return (i, cls, method, base._get_scenario_context(ctx), args)
|
||||
return _scenario_args
|
||||
|
||||
def _run_scenario(self, cls, method, context, args):
|
||||
|
||||
timeout = self.config.get("timeout", 600)
|
||||
concurrency = self.config.get("concurrency", 1)
|
||||
duration = self.config.get("duration")
|
||||
@ -149,7 +154,8 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner):
|
||||
pool = multiprocessing.Pool(concurrency)
|
||||
|
||||
run_args = utils.infinite_run_args_generator(
|
||||
self._iter_scenario_args(cls, method, context, args))
|
||||
self._iter_scenario_args(cls, method, context, args,
|
||||
self.aborted))
|
||||
iter_result = pool.imap(base._run_scenario_once, run_args)
|
||||
|
||||
start = time.time()
|
||||
@ -158,6 +164,8 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner):
|
||||
result = iter_result.next(timeout)
|
||||
except multiprocessing.TimeoutError as e:
|
||||
result = base.format_result_on_timeout(e, timeout)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
self._send_result(result)
|
||||
|
||||
|
@ -28,11 +28,11 @@ SEND_RESULT_DELAY = 1
|
||||
|
||||
|
||||
def _worker_thread(queue, args):
|
||||
queue.put(base._run_scenario_once(args))
|
||||
queue.put(base._run_scenario_once(args))
|
||||
|
||||
|
||||
def _worker_process(rps, times, queue, context, timeout,
|
||||
worker_id, workers, cls, method_name, args):
|
||||
worker_id, workers, cls, method_name, args, aborted):
|
||||
"""Start scenario within threads.
|
||||
|
||||
Spawn N threads per second. Each thread runs scenario once, and appends
|
||||
@ -48,6 +48,7 @@ def _worker_process(rps, times, queue, context, timeout,
|
||||
:param cls: scenario class
|
||||
:param method_name: scenario method name
|
||||
:param args: scenario args
|
||||
:param aborted: multiprocessing.Event that is an abort flag
|
||||
"""
|
||||
|
||||
pool = []
|
||||
@ -61,7 +62,7 @@ def _worker_process(rps, times, queue, context, timeout,
|
||||
randsleep_delay = random.randint(int(sleep / 2 * 100), int(sleep * 100))
|
||||
time.sleep(randsleep_delay / 100.0)
|
||||
|
||||
while times > i:
|
||||
while times > i and not aborted.is_set():
|
||||
scenario_context = base._get_scenario_context(context)
|
||||
i += 1
|
||||
scenario_args = (queue, (worker_id + workers * (i - 1), cls,
|
||||
@ -145,7 +146,7 @@ class RPSScenarioRunner(base.ScenarioRunner):
|
||||
rest -= 1
|
||||
worker_args = (rps_per_worker, times, queue, context,
|
||||
timeout, i, processes_to_start, cls,
|
||||
method_name, args)
|
||||
method_name, args, self.aborted)
|
||||
process = multiprocessing.Process(target=_worker_process,
|
||||
args=worker_args)
|
||||
process.start()
|
||||
|
@ -50,6 +50,8 @@ class SerialScenarioRunner(base.ScenarioRunner):
|
||||
times = self.config.get("times", 1)
|
||||
|
||||
for i in range(times):
|
||||
if self.aborted.is_set():
|
||||
break
|
||||
run_args = (i, cls, method_name,
|
||||
base._get_scenario_context(context), args)
|
||||
result = base._run_scenario_once(run_args)
|
||||
|
@ -267,3 +267,11 @@ class ScenarioRunnerTestCase(test.TestCase):
|
||||
self.assertRaises(
|
||||
jsonschema.ValidationError,
|
||||
lambda: runner._send_result(mock.MagicMock()))
|
||||
|
||||
def test_abort(self):
|
||||
runner = serial.SerialScenarioRunner(
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
self.assertFalse(runner.aborted.is_set())
|
||||
runner.abort()
|
||||
self.assertTrue(runner.aborted.is_set())
|
||||
|
@ -45,9 +45,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
|
||||
constant.ConstantScenarioRunner.validate,
|
||||
self.config)
|
||||
|
||||
def test_run_scenario_constantly_for_times(self):
|
||||
runner = constant.ConstantScenarioRunner(
|
||||
None, self.config)
|
||||
def test__run_scenario_constantly_for_times(self):
|
||||
runner = constant.ConstantScenarioRunner(None, self.config)
|
||||
|
||||
runner._run_scenario(fakes.FakeScenario,
|
||||
"do_it", self.context, self.args)
|
||||
@ -55,9 +54,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
|
||||
for result in runner.result_queue:
|
||||
self.assertIsNotNone(base.ScenarioRunnerResult(result))
|
||||
|
||||
def test_run_scenario_constantly_for_times_exception(self):
|
||||
runner = constant.ConstantScenarioRunner(
|
||||
None, self.config)
|
||||
def test__run_scenario_constantly_for_times_exception(self):
|
||||
runner = constant.ConstantScenarioRunner(None, self.config)
|
||||
|
||||
runner._run_scenario(fakes.FakeScenario,
|
||||
"something_went_wrong", self.context, self.args)
|
||||
@ -66,9 +64,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
|
||||
self.assertIsNotNone(base.ScenarioRunnerResult(result))
|
||||
self.assertIn("error", runner.result_queue[0])
|
||||
|
||||
def test_run_scenario_constantly_for_times_timeout(self):
|
||||
runner = constant.ConstantScenarioRunner(
|
||||
None, self.config)
|
||||
def test__run_scenario_constantly_for_times_timeout(self):
|
||||
runner = constant.ConstantScenarioRunner(None, self.config)
|
||||
|
||||
runner._run_scenario(fakes.FakeScenario,
|
||||
"raise_timeout", self.context, self.args)
|
||||
@ -77,11 +74,25 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
|
||||
self.assertIsNotNone(base.ScenarioRunnerResult(result))
|
||||
self.assertIn("error", runner.result_queue[0])
|
||||
|
||||
def test__run_scenario_constantly_aborted(self):
|
||||
runner = constant.ConstantScenarioRunner(None, self.config)
|
||||
|
||||
class ConstantForDurationScenarioRunnerTeestCase(test.TestCase):
|
||||
runner.abort()
|
||||
runner._run_scenario(fakes.FakeScenario,
|
||||
"do_it", self.context, self.args)
|
||||
self.assertEqual(len(runner.result_queue), 0)
|
||||
|
||||
def test_abort(self):
|
||||
runner = constant.ConstantScenarioRunner(None, self.config)
|
||||
self.assertFalse(runner.aborted.is_set())
|
||||
runner.abort()
|
||||
self.assertTrue(runner.aborted.is_set())
|
||||
|
||||
|
||||
class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ConstantForDurationScenarioRunnerTeestCase, self).setUp()
|
||||
super(ConstantForDurationScenarioRunnerTestCase, self).setUp()
|
||||
duration = 0
|
||||
concurrency = 2
|
||||
timeout = 2
|
||||
@ -138,3 +149,17 @@ class ConstantForDurationScenarioRunnerTeestCase(test.TestCase):
|
||||
for result in runner.result_queue:
|
||||
self.assertIsNotNone(base.ScenarioRunnerResult(result))
|
||||
self.assertIn("error", runner.result_queue[0])
|
||||
|
||||
def test__run_scenario_constantly_aborted(self):
|
||||
runner = constant.ConstantForDurationScenarioRunner(None, self.config)
|
||||
|
||||
runner.abort()
|
||||
runner._run_scenario(fakes.FakeScenario,
|
||||
"do_it", self.context, self.args)
|
||||
self.assertEqual(len(runner.result_queue), 0)
|
||||
|
||||
def test_abort(self):
|
||||
runner = constant.ConstantForDurationScenarioRunner(None, self.config)
|
||||
self.assertFalse(runner.aborted.is_set())
|
||||
runner.abort()
|
||||
self.assertTrue(runner.aborted.is_set())
|
||||
|
@ -75,13 +75,16 @@ class RPSScenarioRunnerTestCase(test.TestCase):
|
||||
isAlive=mock.MagicMock(return_value=False))
|
||||
mock_thread.return_value = mock_thread_instance
|
||||
|
||||
mock_event = mock.MagicMock(
|
||||
is_set=mock.MagicMock(return_value=False))
|
||||
|
||||
times = 4
|
||||
|
||||
context = {"users": [{"tenant_id": "t1", "endpoint": "e1",
|
||||
"id": "uuid1"}]}
|
||||
|
||||
rps._worker_process(10, times, mock_queue, context, 600, 1, 1,
|
||||
"Dummy", "dummy", ())
|
||||
"Dummy", "dummy", (), mock_event)
|
||||
|
||||
self.assertEqual(times, mock_log.debug.call_count)
|
||||
self.assertEqual(times, mock_thread.call_count)
|
||||
@ -120,9 +123,9 @@ class RPSScenarioRunnerTestCase(test.TestCase):
|
||||
def test__run_scenario(self, mock_sleep):
|
||||
context = fakes.FakeUserContext({}).context
|
||||
context["task"] = {"uuid": "fake_uuid"}
|
||||
|
||||
config = {"times": 20, "rps": 20, "timeout": 5}
|
||||
runner = rps.RPSScenarioRunner(
|
||||
None, config)
|
||||
runner = rps.RPSScenarioRunner(None, config)
|
||||
|
||||
runner._run_scenario(fakes.FakeScenario, "do_it", context, {})
|
||||
|
||||
@ -137,11 +140,37 @@ class RPSScenarioRunnerTestCase(test.TestCase):
|
||||
context["task"] = {"uuid": "fake_uuid"}
|
||||
|
||||
config = {"times": 4, "rps": 10}
|
||||
runner = rps.RPSScenarioRunner(
|
||||
None, config)
|
||||
runner = rps.RPSScenarioRunner(None, config)
|
||||
|
||||
runner._run_scenario(fakes.FakeScenario,
|
||||
"something_went_wrong", context, {})
|
||||
self.assertEqual(len(runner.result_queue), config["times"])
|
||||
for result in runner.result_queue:
|
||||
self.assertIsNotNone(base.ScenarioRunnerResult(result))
|
||||
|
||||
@mock.patch("rally.benchmark.runners.rps.time.sleep")
|
||||
def test__run_scenario_aborted(self, mock_sleep):
|
||||
context = fakes.FakeUserContext({}).context
|
||||
context["task"] = {"uuid": "fake_uuid"}
|
||||
|
||||
config = {"times": 20, "rps": 20, "timeout": 5}
|
||||
runner = rps.RPSScenarioRunner(None, config)
|
||||
|
||||
runner.abort()
|
||||
runner._run_scenario(fakes.FakeScenario, "do_it", context, {})
|
||||
|
||||
self.assertEqual(len(runner.result_queue), 0)
|
||||
|
||||
for result in runner.result_queue:
|
||||
self.assertIsNotNone(base.ScenarioRunnerResult(result))
|
||||
|
||||
def test_abort(self):
|
||||
context = fakes.FakeUserContext({}).context
|
||||
context["task"] = {"uuid": "fake_uuid"}
|
||||
|
||||
config = {"times": 4, "rps": 10}
|
||||
runner = rps.RPSScenarioRunner(None, config)
|
||||
|
||||
self.assertFalse(runner.aborted.is_set())
|
||||
runner.abort()
|
||||
self.assertTrue(runner.aborted.is_set())
|
||||
|
@ -26,7 +26,7 @@ class SerialScenarioRunnerTestCase(test.TestCase):
|
||||
super(SerialScenarioRunnerTestCase, self).setUp()
|
||||
|
||||
@mock.patch("rally.benchmark.runners.base._run_scenario_once")
|
||||
def test_run_scenario(self, mock_run_once):
|
||||
def test__run_scenario(self, mock_run_once):
|
||||
times = 5
|
||||
result = {"duration": 10, "idle_duration": 0, "error": [],
|
||||
"scenario_output": {}, "atomic_actions": {}}
|
||||
@ -40,3 +40,18 @@ class SerialScenarioRunnerTestCase(test.TestCase):
|
||||
self.assertEqual(len(runner.result_queue), times)
|
||||
results = list(runner.result_queue)
|
||||
self.assertEqual(results, expected_results)
|
||||
|
||||
def test__run_scenario_aborted(self):
|
||||
runner = serial.SerialScenarioRunner(mock.MagicMock(),
|
||||
{"times": 5})
|
||||
runner.abort()
|
||||
runner._run_scenario(fakes.FakeScenario, "do_it",
|
||||
fakes.FakeUserContext({}).context, {})
|
||||
self.assertEqual(len(runner.result_queue), 0)
|
||||
|
||||
def test_abort(self):
|
||||
runner = serial.SerialScenarioRunner(mock.MagicMock(),
|
||||
{"times": 5})
|
||||
self.assertFalse(runner.aborted.is_set())
|
||||
runner.abort()
|
||||
self.assertTrue(runner.aborted.is_set())
|
||||
|
Loading…
Reference in New Issue
Block a user