Move ScenarioRunner class to a separate module

For the sake of good structural code organization, we move here the central
benchmark engine class, namely the ScenarioRunner, to a separate module. We
also change the test code correspondingly and slightly rename some of the
methods left in the 'utils' module.

Blueprint benchmark-engine-refactoring

Change-Id: Id2cdad7f34c2ce067ad155c13166e256072a47e6
This commit is contained in:
Mikhail Dubov
2013-12-25 23:42:00 +04:00
parent 4ff00b930a
commit efb508b726
8 changed files with 429 additions and 408 deletions

View File

@@ -17,7 +17,7 @@ import json
import jsonschema
from rally.benchmark import base
from rally.benchmark import utils
from rally.benchmark import runner
from rally import consts
from rally import exceptions
from rally.openstack.common.gettextutils import _
@@ -128,13 +128,13 @@ class TestEngine(object):
corresponding benchmark test launches
"""
self.task.update_status(consts.TaskStatus.TEST_TOOL_BENCHMARKING)
runer = utils.ScenarioRunner(self.task, self.endpoints)
scenario_runner = runner.ScenarioRunner(self.task, self.endpoints)
results = {}
for name in self.config:
for n, kwargs in enumerate(self.config[name]):
key = {'name': name, 'pos': n, 'kw': kwargs}
result = runer.run(name, kwargs)
result = scenario_runner.run(name, kwargs)
self.task.append_results(key, {"raw": result})
results[json.dumps(key)] = result
return results

292
rally/benchmark/runner.py Normal file
View File

@@ -0,0 +1,292 @@
# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import multiprocessing
from multiprocessing import pool as multiprocessing_pool
import random
import time
import uuid
from rally.benchmark import base
from rally.benchmark import cleanup_utils
from rally.benchmark import utils
from rally.openstack.common.gettextutils import _
from rally.openstack.common import log as logging
from rally import utils as rutils
LOG = logging.getLogger(__name__)
# NOTE(msdubov): These objects are shared between multiple scenario processes.
__openstack_clients__ = []
__admin_clients__ = {}
__scenario_context__ = {}
def _run_scenario_loop(args):
i, cls, method_name, kwargs = args
LOG.info("ITER: %s" % i)
# NOTE(msdubov): Each scenario run uses a random openstack client
# from a predefined set to act from different users.
cls._clients = random.choice(__openstack_clients__)
cls._admin_clients = __admin_clients__
cls._context = __scenario_context__
cls.idle_time = 0
try:
with rutils.Timer() as timer:
getattr(cls, method_name)(**kwargs)
except Exception as e:
return {"time": timer.duration() - cls.idle_time,
"idle_time": cls.idle_time, "error": utils.format_exc(e)}
return {"time": timer.duration() - cls.idle_time,
"idle_time": cls.idle_time, "error": None}
class ScenarioRunner(object):
"""Tool that gets and runs one Scenario."""
def __init__(self, task, cloud_config):
self.task = task
self.endpoints = cloud_config
global __admin_clients__
keys = ["admin_username", "admin_password", "admin_tenant_name", "uri"]
__admin_clients__ = utils.create_openstack_clients([self.endpoints],
keys)[0]
base.Scenario.register()
def _create_temp_tenants_and_users(self, tenants, users_per_tenant):
run_id = str(uuid.uuid4())
self.tenants = [__admin_clients__["keystone"].tenants.create(
"temp_%(rid)s_tenant_%(iter)i" % {"rid": run_id,
"iter": i})
for i in range(tenants)]
self.users = []
temporary_endpoints = []
for tenant in self.tenants:
for uid in range(users_per_tenant):
username = "%(tname)s_user_%(uid)d" % {"tname": tenant.name,
"uid": uid}
password = "password"
user = __admin_clients__["keystone"].users.create(username,
password,
"%s@test.com"
% username,
tenant.id)
self.users.append(user)
user_credentials = {"username": username, "password": password,
"tenant_name": tenant.name,
"uri": self.endpoints["uri"]}
temporary_endpoints.append(user_credentials)
return temporary_endpoints
@classmethod
def _delete_nova_resources(cls, nova):
cleanup_utils._delete_servers(nova)
cleanup_utils._delete_keypairs(nova)
cleanup_utils._delete_security_groups(nova)
cleanup_utils._delete_networks(nova)
@classmethod
def _delete_cinder_resources(cls, cinder):
cleanup_utils._delete_volume_transfers(cinder)
cleanup_utils._delete_volumes(cinder)
cleanup_utils._delete_volume_types(cinder)
cleanup_utils._delete_volume_snapshots(cinder)
cleanup_utils._delete_volume_backups(cinder)
@classmethod
def _delete_glance_resources(cls, glance, project_uuid):
cleanup_utils._delete_images(glance, project_uuid)
@classmethod
def _cleanup_with_clients(cls, indexes):
for index in indexes:
clients = __openstack_clients__[index]
try:
cls._delete_nova_resources(clients["nova"])
cls._delete_glance_resources(clients["glance"],
clients["keystone"].project_id)
cls._delete_cinder_resources(clients["cinder"])
except Exception as e:
LOG.info(_('Unable to fully cleanup the cloud: %s') %
(e.message))
def _cleanup_scenario(self, concurrent):
indexes = range(0, len(__openstack_clients__))
chunked_indexes = [indexes[i:i + concurrent]
for i in range(0, len(indexes), concurrent)]
pool = multiprocessing.Pool(concurrent)
for client_indicies in chunked_indexes:
pool.apply_async(utils.async_cleanup, args=(ScenarioRunner,
client_indicies,))
pool.close()
pool.join()
def _delete_temp_tenants_and_users(self):
for user in self.users:
user.delete()
for tenant in self.tenants:
tenant.delete()
def _run_scenario_continuously_for_times(self, cls, method, args,
times, concurrent, timeout):
test_args = [(i, cls, method, args) for i in xrange(times)]
pool = multiprocessing.Pool(concurrent)
iter_result = pool.imap(_run_scenario_loop, test_args)
results = []
for i in range(len(test_args)):
try:
result = iter_result.next(timeout)
except multiprocessing.TimeoutError as e:
result = {"time": timeout, "error": utils.format_exc(e)}
except Exception as e:
result = {"time": None, "error": utils.format_exc(e)}
results.append(result)
pool.close()
pool.join()
return results
def _run_scenario_continuously_for_duration(self, cls, method, args,
duration, concurrent, timeout):
pool = multiprocessing.Pool(concurrent)
run_args = utils.infinite_run_args((cls, method, args))
iter_result = pool.imap(_run_scenario_loop, run_args)
start = time.time()
results_queue = collections.deque([], maxlen=concurrent)
while True:
if time.time() - start > duration * 60:
break
try:
result = iter_result.next(timeout)
except multiprocessing.TimeoutError as e:
result = {"time": timeout, "error": utils.format_exc(e)}
except Exception as e:
result = {"time": None, "error": utils.format_exc(e)}
results_queue.append(result)
results = list(results_queue)
pool.terminate()
pool.join()
return results
def _run_scenario_periodically(self, cls, method, args,
times, period, timeout):
async_results = []
for i in xrange(times):
thread = multiprocessing_pool.ThreadPool(processes=1)
async_result = thread.apply_async(_run_scenario_loop,
((i, cls, method, args),))
async_results.append(async_result)
if i != times - 1:
time.sleep(period * 60)
results = []
for async_result in async_results:
try:
result = async_result.get()
except multiprocessing.TimeoutError as e:
result = {"time": timeout, "error": utils.format_exc(e)}
results.append(result)
return results
def _run_scenario(self, cls, method, args, execution_type, config):
timeout = config.get("timeout", 10000)
if execution_type == "continuous":
concurrent = config.get("active_users", 1)
# NOTE(msdubov): If not specified, perform single scenario run.
if "duration" not in config and "times" not in config:
config["times"] = 1
# Continiously run a benchmark scenario the specified
# amount of times.
if "times" in config:
times = config["times"]
return self._run_scenario_continuously_for_times(
cls, method, args, times, concurrent, timeout)
# Continiously run a scenario as many times as needed
# to fill up the given period of time.
elif "duration" in config:
duration = config["duration"]
return self._run_scenario_continuously_for_duration(
cls, method, args, duration, concurrent, timeout)
elif execution_type == "periodic":
times = config["times"]
period = config["period"]
# Run a benchmark scenario the specified amount of times
# with a specified period between two consecutive launches.
return self._run_scenario_periodically(cls, method, args,
times, period, timeout)
def run(self, name, kwargs):
cls_name, method_name = name.split(".")
cls = base.Scenario.get_by_name(cls_name)
args = kwargs.get('args', {})
init_args = kwargs.get('init', {})
execution_type = kwargs.get('execution', 'continuous')
config = kwargs.get('config', {})
tenants = config.get('tenants', 1)
users_per_tenant = config.get('users_per_tenant', 1)
temp_users = self._create_temp_tenants_and_users(tenants,
users_per_tenant)
global __openstack_clients__, __scenario_context__
# NOTE(msdubov): Call init() with admin openstack clients
cls._clients = __admin_clients__
__scenario_context__ = cls.init(init_args)
# NOTE(msdubov): Launch scenarios with non-admin openstack clients
keys = ["username", "password", "tenant_name", "uri"]
__openstack_clients__ = utils.create_openstack_clients(temp_users,
keys)
results = self._run_scenario(cls, method_name, args,
execution_type, config)
self._cleanup_scenario(config.get("active_users", 1))
self._delete_temp_tenants_and_users()
return results

View File

@@ -13,31 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import multiprocessing
from multiprocessing import pool as multiprocessing_pool
import random
import time
import traceback
import uuid
from rally.benchmark import base
from rally.benchmark import cleanup_utils
from rally import exceptions as rally_exceptions
from rally.openstack.common.gettextutils import _
from rally.openstack.common import log as logging
from rally import osclients
from rally import utils
LOG = logging.getLogger(__name__)
# NOTE(msdubov): These objects are shared between multiple scenario processes.
__openstack_clients__ = []
__admin_clients__ = {}
__scenario_context__ = {}
def resource_is(status):
return lambda resource: resource.status == status
@@ -98,45 +80,22 @@ def false(resource):
return False
def _async_cleanup(cls, indicies):
def async_cleanup(cls, indicies):
cls._cleanup_with_clients(indicies)
def _format_exc(exc):
def format_exc(exc):
return [str(type(exc)), str(exc), traceback.format_exc()]
def _infinite_run_args(args):
def infinite_run_args(args):
i = 0
while True:
yield (i,) + args
i += 1
def _run_scenario_loop(args):
i, cls, method_name, kwargs = args
LOG.info("ITER: %s" % i)
# NOTE(msdubov): Each scenario run uses a random openstack client
# from a predefined set to act from different users.
cls._clients = random.choice(__openstack_clients__)
cls._admin_clients = __admin_clients__
cls._context = __scenario_context__
cls.idle_time = 0
try:
with utils.Timer() as timer:
getattr(cls, method_name)(**kwargs)
except Exception as e:
return {"time": timer.duration() - cls.idle_time,
"idle_time": cls.idle_time, "error": _format_exc(e)}
return {"time": timer.duration() - cls.idle_time,
"idle_time": cls.idle_time, "error": None}
def _create_openstack_clients(users_endpoints, keys):
def create_openstack_clients(users_endpoints, keys):
# NOTE(msdubov): Creating here separate openstack clients for each of
# the temporary users involved in benchmarking.
client_managers = [osclients.Clients(*[credentials[k] for k in keys])
@@ -152,234 +111,3 @@ def _create_openstack_clients(users_endpoints, keys):
]
return clients
class ScenarioRunner(object):
"""Tool that gets and runs one Scenario."""
def __init__(self, task, cloud_config):
self.task = task
self.endpoints = cloud_config
global __admin_clients__
keys = ["admin_username", "admin_password", "admin_tenant_name", "uri"]
__admin_clients__ = _create_openstack_clients([self.endpoints],
keys)[0]
base.Scenario.register()
def _create_temp_tenants_and_users(self, tenants, users_per_tenant):
run_id = str(uuid.uuid4())
self.tenants = [__admin_clients__["keystone"].tenants.create(
"temp_%(rid)s_tenant_%(iter)i" % {"rid": run_id,
"iter": i})
for i in range(tenants)]
self.users = []
temporary_endpoints = []
for tenant in self.tenants:
for uid in range(users_per_tenant):
username = "%(tname)s_user_%(uid)d" % {"tname": tenant.name,
"uid": uid}
password = "password"
user = __admin_clients__["keystone"].users.create(username,
password,
"%s@test.com"
% username,
tenant.id)
self.users.append(user)
user_credentials = {"username": username, "password": password,
"tenant_name": tenant.name,
"uri": self.endpoints["uri"]}
temporary_endpoints.append(user_credentials)
return temporary_endpoints
@classmethod
def _delete_nova_resources(cls, nova):
cleanup_utils._delete_servers(nova)
cleanup_utils._delete_keypairs(nova)
cleanup_utils._delete_security_groups(nova)
cleanup_utils._delete_networks(nova)
@classmethod
def _delete_cinder_resources(cls, cinder):
cleanup_utils._delete_volume_transfers(cinder)
cleanup_utils._delete_volumes(cinder)
cleanup_utils._delete_volume_types(cinder)
cleanup_utils._delete_volume_snapshots(cinder)
cleanup_utils._delete_volume_backups(cinder)
@classmethod
def _delete_glance_resources(cls, glance, project_uuid):
cleanup_utils._delete_images(glance, project_uuid)
@classmethod
def _cleanup_with_clients(cls, indexes):
for index in indexes:
clients = __openstack_clients__[index]
try:
cls._delete_nova_resources(clients["nova"])
cls._delete_glance_resources(clients["glance"],
clients["keystone"].project_id)
cls._delete_cinder_resources(clients["cinder"])
except Exception as e:
LOG.info(_('Unable to fully cleanup the cloud: %s') %
(e.message))
def _cleanup_scenario(self, concurrent):
indexes = range(0, len(__openstack_clients__))
chunked_indexes = [indexes[i:i + concurrent]
for i in range(0, len(indexes), concurrent)]
pool = multiprocessing.Pool(concurrent)
for client_indicies in chunked_indexes:
pool.apply_async(_async_cleanup, args=(ScenarioRunner,
client_indicies,))
pool.close()
pool.join()
def _delete_temp_tenants_and_users(self):
for user in self.users:
user.delete()
for tenant in self.tenants:
tenant.delete()
def _run_scenario_continuously_for_times(self, cls, method, args,
times, concurrent, timeout):
test_args = [(i, cls, method, args) for i in xrange(times)]
pool = multiprocessing.Pool(concurrent)
iter_result = pool.imap(_run_scenario_loop, test_args)
results = []
for i in range(len(test_args)):
try:
result = iter_result.next(timeout)
except multiprocessing.TimeoutError as e:
result = {"time": timeout, "error": _format_exc(e)}
except Exception as e:
result = {"time": None, "error": _format_exc(e)}
results.append(result)
pool.close()
pool.join()
return results
def _run_scenario_continuously_for_duration(self, cls, method, args,
duration, concurrent, timeout):
pool = multiprocessing.Pool(concurrent)
run_args = _infinite_run_args((cls, method, args))
iter_result = pool.imap(_run_scenario_loop, run_args)
start = time.time()
results_queue = collections.deque([], maxlen=concurrent)
while True:
if time.time() - start > duration * 60:
break
try:
result = iter_result.next(timeout)
except multiprocessing.TimeoutError as e:
result = {"time": timeout, "error": _format_exc(e)}
except Exception as e:
result = {"time": None, "error": _format_exc(e)}
results_queue.append(result)
results = list(results_queue)
pool.terminate()
pool.join()
return results
def _run_scenario_periodically(self, cls, method, args,
times, period, timeout):
async_results = []
for i in xrange(times):
thread = multiprocessing_pool.ThreadPool(processes=1)
async_result = thread.apply_async(_run_scenario_loop,
((i, cls, method, args),))
async_results.append(async_result)
if i != times - 1:
time.sleep(period * 60)
results = []
for async_result in async_results:
try:
result = async_result.get()
except multiprocessing.TimeoutError as e:
result = {"time": timeout, "error": _format_exc(e)}
results.append(result)
return results
def _run_scenario(self, cls, method, args, execution_type, config):
timeout = config.get("timeout", 10000)
if execution_type == "continuous":
concurrent = config.get("active_users", 1)
# NOTE(msdubov): If not specified, perform single scenario run.
if "duration" not in config and "times" not in config:
config["times"] = 1
# Continiously run a benchmark scenario the specified
# amount of times.
if "times" in config:
times = config["times"]
return self._run_scenario_continuously_for_times(
cls, method, args, times, concurrent, timeout)
# Continiously run a scenario as many times as needed
# to fill up the given period of time.
elif "duration" in config:
duration = config["duration"]
return self._run_scenario_continuously_for_duration(
cls, method, args, duration, concurrent, timeout)
elif execution_type == "periodic":
times = config["times"]
period = config["period"]
# Run a benchmark scenario the specified amount of times
# with a specified period between two consecutive launches.
return self._run_scenario_periodically(cls, method, args,
times, period, timeout)
def run(self, name, kwargs):
cls_name, method_name = name.split(".")
cls = base.Scenario.get_by_name(cls_name)
args = kwargs.get('args', {})
init_args = kwargs.get('init', {})
execution_type = kwargs.get('execution', 'continuous')
config = kwargs.get('config', {})
tenants = config.get('tenants', 1)
users_per_tenant = config.get('users_per_tenant', 1)
temp_users = self._create_temp_tenants_and_users(tenants,
users_per_tenant)
global __openstack_clients__, __scenario_context__
# NOTE(msdubov): Call init() with admin openstack clients
cls._clients = __admin_clients__
__scenario_context__ = cls.init(init_args)
# NOTE(msdubov): Launch scenarios with non-admin openstack clients
keys = ["username", "password", "tenant_name", "uri"]
__openstack_clients__ = _create_openstack_clients(temp_users, keys)
results = self._run_scenario(cls, method_name, args,
execution_type, config)
self._cleanup_scenario(config.get("active_users", 1))
self._delete_temp_tenants_and_users()
return results

View File

@@ -60,7 +60,7 @@ class NovaServersTestCase(test.TestCase):
temp_keys = ["username", "password", "tenant_name", "uri"]
users_endpoints = [dict(zip(temp_keys, temp_keys))]
servers.NovaServers._clients = butils._create_openstack_clients(
servers.NovaServers._clients = butils.create_openstack_clients(
users_endpoints, temp_keys)[0]
mock_boot.return_value = object()

View File

@@ -66,7 +66,7 @@ class NovaScenarioTestCase(test.TestCase):
temp_keys = ["username", "password", "tenant_name", "uri"]
users_endpoints = [dict(zip(temp_keys, temp_keys))]
utils.NovaScenario._clients = butils.\
_create_openstack_clients(users_endpoints, temp_keys)[0]
create_openstack_clients(users_endpoints, temp_keys)[0]
utils.utils = mock_rally_utils
utils.bench_utils.get_from_manager = lambda: get_from_mgr

View File

@@ -17,7 +17,7 @@
import mock
import multiprocessing
from rally.benchmark import utils
from rally.benchmark import runner
from rally import test
from tests import fakes
@@ -48,19 +48,19 @@ class ScenarioTestCase(test.TestCase):
def test_init_calls_register(self):
with mock.patch("rally.benchmark.utils.osclients") as mock_osclients:
mock_osclients.Clients.return_value = fakes.FakeClients()
with mock.patch("rally.benchmark.utils.base") as mock_base:
utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
with mock.patch("rally.benchmark.runner.base") as mock_base:
runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
self.assertEqual(mock_base.mock_calls,
[mock.call.Scenario.register()])
def test_create_temp_tenants_and_users(self):
with mock.patch("rally.benchmark.utils.osclients") as mock_osclients:
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
tenants = 10
users_per_tenant = 5
endpoints = runner._create_temp_tenants_and_users(tenants,
users_per_tenant)
endpoints = srunner._create_temp_tenants_and_users(
tenants, users_per_tenant)
self.assertEqual(len(endpoints), tenants * users_per_tenant)
endpoint_keys = set(["username", "password", "tenant_name", "uri"])
for endpoint in endpoints:
@@ -69,48 +69,48 @@ class ScenarioTestCase(test.TestCase):
def test_run_scenario(self):
with mock.patch("rally.benchmark.utils.osclients") as mock_osclients:
mock_osclients.Clients.return_value = fakes.FakeClients()
with mock.patch("rally.benchmark.utils.utils") as mock_utils:
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
utils.__openstack_clients__ = ["client"]
with mock.patch("rally.benchmark.runner.rutils") as mock_utils:
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner.__openstack_clients__ = ["client"]
active_users = 2
times = 3
duration = 0.01
mock_utils.Timer = fakes.FakeTimer
results = runner._run_scenario(fakes.FakeScenario,
"do_it", {}, "continuous",
{"times": times,
"active_users": active_users,
"timeout": 2})
results = srunner._run_scenario(fakes.FakeScenario,
"do_it", {}, "continuous",
{"times": times,
"active_users": active_users,
"timeout": 2})
expected = [{"time": 10, "idle_time": 0, "error": None}
for i in range(times)]
self.assertEqual(results, expected)
results = runner._run_scenario(fakes.FakeScenario,
"do_it", {}, "continuous",
{"duration": duration,
"active_users": active_users,
"timeout": 2})
results = srunner._run_scenario(fakes.FakeScenario,
"do_it", {}, "continuous",
{"duration": duration,
"active_users": active_users,
"timeout": 2})
expected = [{"time": 10, "idle_time": 0, "error": None}
for i in range(active_users)]
self.assertEqual(results, expected)
@mock.patch("rally.benchmark.utils.osclients")
@mock.patch("multiprocessing.pool.IMapIterator.next")
@mock.patch("rally.benchmark.utils.time.time")
@mock.patch("rally.benchmark.runner.time.time")
def test_run_scenario_timeout(self, mock_time, mock_next, mock_osclients):
mock_time.side_effect = [1, 2, 3, 10]
mock_next.side_effect = multiprocessing.TimeoutError()
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
utils.__openstack_clients__ = ["client"]
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner.__openstack_clients__ = ["client"]
times = 4
active_users = 2
results = runner._run_scenario(fakes.FakeScenario,
"too_long", {}, "continuous",
{"times": times,
"active_users": active_users,
"timeout": 0.01})
results = srunner._run_scenario(fakes.FakeScenario,
"too_long", {}, "continuous",
{"times": times,
"active_users": active_users,
"timeout": 0.01})
self.assertEqual(len(results), times)
for r in results:
self.assertEqual(r['time'], 0.01)
@@ -118,11 +118,11 @@ class ScenarioTestCase(test.TestCase):
str(multiprocessing.TimeoutError))
duration = 0.1
results = runner._run_scenario(fakes.FakeScenario,
"too_long", {}, "continuous",
{"duration": duration,
"active_users": active_users,
"timeout": 0.01})
results = srunner._run_scenario(fakes.FakeScenario,
"too_long", {}, "continuous",
{"duration": duration,
"active_users": active_users,
"timeout": 0.01})
self.assertEqual(len(results), active_users)
for r in results:
self.assertEqual(r['time'], 0.01)
@@ -132,31 +132,31 @@ class ScenarioTestCase(test.TestCase):
def test_run_scenario_exception_inside_test(self):
with mock.patch("rally.benchmark.utils.osclients") as mock_osclients:
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
utils.__openstack_clients__ = ["client"]
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner.__openstack_clients__ = ["client"]
times = 1
duration = 0.01
active_users = 2
with mock.patch("rally.benchmark.utils.utils") as mock_utils:
with mock.patch("rally.benchmark.runner.rutils") as mock_utils:
mock_utils.Timer = fakes.FakeTimer
results = runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", {},
"continuous",
{"times": times,
"active_users": active_users,
"timeout": 1})
results = srunner._run_scenario(fakes.FakeScenario,
"something_went_wrong", {},
"continuous",
{"times": times,
"active_users": active_users,
"timeout": 1})
self.assertEqual(len(results), times)
for r in results:
self.assertEqual(r['time'], 10)
self.assertEqual(r['error'][:2],
[str(Exception), "Something went wrong"])
results = runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", {},
"continuous",
{"duration": duration,
"active_users": active_users,
"timeout": 1})
results = srunner._run_scenario(fakes.FakeScenario,
"something_went_wrong", {},
"continuous",
{"duration": duration,
"active_users": active_users,
"timeout": 1})
self.assertEqual(len(results), active_users)
for r in results:
self.assertEqual(r['time'], 10)
@@ -166,25 +166,25 @@ class ScenarioTestCase(test.TestCase):
def test_run_scenario_exception_outside_test(self):
pass
@mock.patch("rally.benchmark.utils.multiprocessing")
@mock.patch("rally.benchmark.runner.multiprocessing")
@mock.patch("rally.benchmark.utils.osclients")
def test_run_scenario_continuously_for_times(self, mock_osclients,
mock_multi):
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
utils.__openstack_clients__ = ["client"]
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner.__openstack_clients__ = ["client"]
times = 3
active_users = 4
timeout = 5
mock_multi.Pool = mock.MagicMock()
runner._run_scenario_continuously_for_times(fakes.FakeScenario,
"do_it", {},
times, active_users,
timeout)
srunner._run_scenario_continuously_for_times(fakes.FakeScenario,
"do_it", {},
times, active_users,
timeout)
expect = [
mock.call(active_users),
mock.call().imap(
utils._run_scenario_loop,
runner._run_scenario_loop,
[(i, fakes.FakeScenario, "do_it", {})
for i in xrange(times)]
)
@@ -196,43 +196,43 @@ class ScenarioTestCase(test.TestCase):
])
self.assertEqual(mock_multi.Pool.mock_calls, expect)
@mock.patch("rally.benchmark.utils._infinite_run_args")
@mock.patch("rally.benchmark.utils.multiprocessing")
@mock.patch("rally.benchmark.utils.infinite_run_args")
@mock.patch("rally.benchmark.runner.multiprocessing")
@mock.patch("rally.benchmark.utils.osclients")
def test_run_scenario_continuously_for_duration(self, mock_osclients,
mock_multi, mock_generate):
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
utils.__openstack_clients__ = ["client"]
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner.__openstack_clients__ = ["client"]
duration = 0
active_users = 4
timeout = 5
mock_multi.Pool = mock.MagicMock()
mock_generate.return_value = {}
runner._run_scenario_continuously_for_duration(fakes.FakeScenario,
"do_it", {}, duration,
active_users, timeout)
srunner._run_scenario_continuously_for_duration(fakes.FakeScenario,
"do_it", {}, duration,
active_users, timeout)
expect = [
mock.call(active_users),
mock.call().imap(utils._run_scenario_loop, {}),
mock.call().imap(runner._run_scenario_loop, {}),
mock.call().terminate(),
mock.call().join()
]
self.assertEqual(mock_multi.Pool.mock_calls, expect)
@mock.patch("rally.benchmark.utils._run_scenario_loop")
@mock.patch("rally.benchmark.utils.time.sleep")
@mock.patch("rally.benchmark.runner._run_scenario_loop")
@mock.patch("rally.benchmark.runner.time.sleep")
@mock.patch("rally.benchmark.utils.osclients")
def test_run_scenario_periodically(self, mock_osclients,
mock_sleep, mock_run_scenario_loop):
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
utils.__openstack_clients__ = ["client"]
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner.__openstack_clients__ = ["client"]
times = 3
period = 4
timeout = 5
runner._run_scenario_periodically(fakes.FakeScenario, "do_it", {},
times, period, timeout)
srunner._run_scenario_periodically(fakes.FakeScenario, "do_it", {},
times, period, timeout)
expected = [mock.call((i, fakes.FakeScenario, "do_it", {}))
for i in xrange(times)]
@@ -242,92 +242,93 @@ class ScenarioTestCase(test.TestCase):
mock_sleep.has_calls(expected)
@mock.patch("rally.benchmark.utils.osclients")
@mock.patch("rally.benchmark.utils.base")
@mock.patch("rally.benchmark.runner.base")
@mock.patch("rally.benchmark.utils.osclients")
def test_run_continuous(self, mock_osclients, mock_base, mock_clients):
FakeScenario = mock.MagicMock()
FakeScenario.init = mock.MagicMock(return_value={})
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner._run_scenario_continuously_for_times = \
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
srunner._run_scenario_continuously_for_times = \
mock.MagicMock(return_value="result")
runner._run_scenario_continuously_for_duration = \
srunner._run_scenario_continuously_for_duration = \
mock.MagicMock(return_value="result")
runner._create_temp_tenants_and_users = mock.MagicMock(
srunner._create_temp_tenants_and_users = mock.MagicMock(
return_value=[])
runner._delete_temp_tenants_and_users = mock.MagicMock()
srunner._delete_temp_tenants_and_users = mock.MagicMock()
mock_base.Scenario.get_by_name = \
mock.MagicMock(return_value=FakeScenario)
mock_clients.return_value = ["client"]
result = runner._run_scenario(FakeScenario, "do_it", {"a": 1},
"continuous", {"times": 2,
"active_users": 3,
"timeout": 1})
result = srunner._run_scenario(FakeScenario, "do_it", {"a": 1},
"continuous", {"times": 2,
"active_users": 3,
"timeout": 1})
self.assertEqual(result, "result")
runner._run_scenario_continuously_for_times.assert_called_once_with(
srunner._run_scenario_continuously_for_times.assert_called_once_with(
FakeScenario, "do_it", {"a": 1}, 2, 3, 1)
result = runner._run_scenario(FakeScenario, "do_it", {"a": 1},
"continuous", {"duration": 2,
"active_users": 3,
"timeout": 1})
result = srunner._run_scenario(FakeScenario, "do_it", {"a": 1},
"continuous", {"duration": 2,
"active_users": 3,
"timeout": 1})
self.assertEqual(result, "result")
runner._run_scenario_continuously_for_duration.assert_called_once_with(
FakeScenario, "do_it", {"a": 1}, 2, 3, 1)
srunner._run_scenario_continuously_for_duration.\
assert_called_once_with(FakeScenario, "do_it", {"a": 1}, 2, 3, 1)
@mock.patch("rally.benchmark.utils.osclients")
@mock.patch("rally.benchmark.utils.base")
@mock.patch("rally.benchmark.runner.base")
@mock.patch("rally.benchmark.utils.osclients")
def test_run_periodic(self, mock_osclients, mock_base, mock_clients):
FakeScenario = mock.MagicMock()
FakeScenario.init = mock.MagicMock(return_value={})
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner._run_scenario_periodically = mock.MagicMock(
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
srunner._run_scenario_periodically = mock.MagicMock(
return_value="result")
runner._create_temp_tenants_and_users = mock.MagicMock(
srunner._create_temp_tenants_and_users = mock.MagicMock(
return_value=[])
runner._delete_temp_tenants_and_users = mock.MagicMock()
srunner._delete_temp_tenants_and_users = mock.MagicMock()
mock_base.Scenario.get_by_name = \
mock.MagicMock(return_value=FakeScenario)
mock_clients.return_value = ["client"]
result = runner._run_scenario(FakeScenario, "do_it", {"a": 1},
"periodic", {"times": 2, "period": 3,
"timeout": 1})
result = srunner._run_scenario(FakeScenario, "do_it", {"a": 1},
"periodic", {"times": 2, "period": 3,
"timeout": 1})
self.assertEqual(result, "result")
runner._run_scenario_periodically.assert_called_once_with(
srunner._run_scenario_periodically.assert_called_once_with(
FakeScenario, "do_it", {"a": 1}, 2, 3, 1)
@mock.patch("rally.benchmark.utils._create_openstack_clients")
@mock.patch("rally.benchmark.utils.base")
@mock.patch("rally.benchmark.utils.create_openstack_clients")
@mock.patch("rally.benchmark.runner.base")
@mock.patch("rally.benchmark.utils.osclients")
def test_run(self, mock_osclients, mock_base, mock_clients):
FakeScenario = mock.MagicMock()
FakeScenario.init = mock.MagicMock(return_value={})
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner._run_scenario = mock.MagicMock(return_value="result")
runner._create_temp_tenants_and_users = mock.MagicMock(
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
srunner._run_scenario = mock.MagicMock(return_value="result")
srunner._create_temp_tenants_and_users = mock.MagicMock(
return_value=[])
runner._delete_temp_tenants_and_users = mock.MagicMock()
srunner._delete_temp_tenants_and_users = mock.MagicMock()
mock_base.Scenario.get_by_name = \
mock.MagicMock(return_value=FakeScenario)
result = runner.run("FakeScenario.do_it", {})
result = srunner.run("FakeScenario.do_it", {})
self.assertEqual(result, "result")
runner.run("FakeScenario.do_it",
{"args": {"a": 1}, "init": {"arg": 1},
"config": {"timeout": 1, "times": 2, "active_users": 3,
"tenants": 5, "users_per_tenant": 2}})
runner.run("FakeScenario.do_it",
{"args": {"a": 1}, "init": {"fake": "arg"},
"execution_type": "continuous",
"config": {"timeout": 1, "duration": 40, "active_users": 3,
"tenants": 5, "users_per_tenant": 2}})
srunner.run("FakeScenario.do_it",
{"args": {"a": 1}, "init": {"arg": 1},
"config": {"timeout": 1, "times": 2, "active_users": 3,
"tenants": 5, "users_per_tenant": 2}})
srunner.run("FakeScenario.do_it",
{"args": {"a": 1}, "init": {"fake": "arg"},
"execution_type": "continuous",
"config": {"timeout": 1, "duration": 40,
"active_users": 3, "tenants": 5,
"users_per_tenant": 2}})
expected = [
mock.call(FakeScenario, "do_it", {}, "continuous", {}),
@@ -338,14 +339,14 @@ class ScenarioTestCase(test.TestCase):
{"timeout": 1, "duration": 40, "active_users": 3,
"tenants": 5, "users_per_tenant": 2})
]
self.assertEqual(runner._run_scenario.mock_calls, expected)
self.assertEqual(srunner._run_scenario.mock_calls, expected)
expected = [
mock.call(1, 1),
mock.call(5, 2),
mock.call(5, 2)
]
self.assertEqual(runner._create_temp_tenants_and_users.mock_calls,
self.assertEqual(srunner._create_temp_tenants_and_users.mock_calls,
expected)
expected = [
@@ -355,8 +356,8 @@ class ScenarioTestCase(test.TestCase):
]
self.assertEqual(FakeScenario.mock_calls, expected)
@mock.patch("rally.benchmark.utils._create_openstack_clients")
@mock.patch("rally.benchmark.utils.base")
@mock.patch("rally.benchmark.utils.create_openstack_clients")
@mock.patch("rally.benchmark.runner.base")
@mock.patch("rally.benchmark.utils.osclients")
@mock.patch("multiprocessing.Pool")
def test_generic_cleanup(self, mock_pool, mock_osclients,
@@ -376,11 +377,11 @@ class ScenarioTestCase(test.TestCase):
]
mock_clients.return_value = clients
runner = utils.ScenarioRunner(mock.MagicMock(), self.fake_kw)
runner._run_scenario = mock.MagicMock(return_value="result")
runner._create_temp_tenants_and_users = mock.MagicMock(
srunner = runner.ScenarioRunner(mock.MagicMock(), self.fake_kw)
srunner._run_scenario = mock.MagicMock(return_value="result")
srunner._create_temp_tenants_and_users = mock.MagicMock(
return_value=[])
runner._delete_temp_tenants_and_users = mock.MagicMock()
srunner._delete_temp_tenants_and_users = mock.MagicMock()
mock_base.Scenario.get_by_name = \
mock.MagicMock(return_value=FakeScenario)
@@ -404,10 +405,10 @@ class ScenarioTestCase(test.TestCase):
mock_pool.return_value = MockedPool()
runner.run("FakeScenario.do_it",
{"args": {"a": 1}, "init": {"arg": 1},
"config": {"timeout": 1, "times": 2, "active_users": 3,
"tenants": 5, "users_per_tenant": 2}})
srunner.run("FakeScenario.do_it",
{"args": {"a": 1}, "init": {"arg": 1},
"config": {"timeout": 1, "times": 2, "active_users": 3,
"tenants": 5, "users_per_tenant": 2}})
def _assert_purged(manager, resource_type):
resources = manager.list()

View File

@@ -124,7 +124,7 @@ class TestEngineTestCase(test.TestCase):
self.assertEqual(tester.endpoints,
self.valid_cloud_config['identity'])
@mock.patch("rally.benchmark.utils.ScenarioRunner.run")
@mock.patch("rally.benchmark.runner.ScenarioRunner.run")
@mock.patch("rally.benchmark.utils.osclients")
def test_run(self, mock_osclients, mock_run):
mock_osclients.Clients.return_value = fakes.FakeClients()
@@ -133,7 +133,7 @@ class TestEngineTestCase(test.TestCase):
with tester.bind(self.valid_cloud_config):
tester.run()
@mock.patch("rally.benchmark.utils.ScenarioRunner.run")
@mock.patch("rally.benchmark.runner.ScenarioRunner.run")
@mock.patch("rally.benchmark.utils.osclients")
def test_task_status_basic_chain(self, mock_osclients, mock_scenario_run):
fake_task = mock.MagicMock()
@@ -161,7 +161,7 @@ class TestEngineTestCase(test.TestCase):
fake_task.mock_calls)
self.assertEqual(mock_calls, expected)
@mock.patch("rally.benchmark.utils.ScenarioRunner.run")
@mock.patch("rally.benchmark.runner.ScenarioRunner.run")
@mock.patch("rally.benchmark.utils.osclients")
def test_task_status_failed(self, mock_osclients, mock_scenario_run):
fake_task = mock.MagicMock()

View File

@@ -83,7 +83,7 @@ class APITestCase(test.TestCase):
'endpoint': self.endpoint,
}
@mock.patch('rally.benchmark.engine.utils.ScenarioRunner')
@mock.patch('rally.benchmark.engine.runner.ScenarioRunner')
@mock.patch('rally.objects.deploy.db.deployment_get')
@mock.patch('rally.objects.task.db.task_result_create')
@mock.patch('rally.objects.task.db.task_update')