Merge "Save raw task results in chunks"
This commit is contained in:
commit
e54d78d486
@ -111,6 +111,10 @@
|
|||||||
# value)
|
# value)
|
||||||
#openstack_client_http_timeout = 180.0
|
#openstack_client_http_timeout = 180.0
|
||||||
|
|
||||||
|
# Size of raw result chunk in iterations (integer value)
|
||||||
|
# Minimum value: 1
|
||||||
|
#raw_result_chunk_size = 1000
|
||||||
|
|
||||||
|
|
||||||
[benchmark]
|
[benchmark]
|
||||||
|
|
||||||
@ -242,12 +246,12 @@
|
|||||||
# the status. (floating point value)
|
# the status. (floating point value)
|
||||||
#magnum_cluster_create_prepoll_delay = 5.0
|
#magnum_cluster_create_prepoll_delay = 5.0
|
||||||
|
|
||||||
# Time(in sec) to wait for magnum cluster to be created. (floating point
|
# Time(in sec) to wait for magnum cluster to be created. (floating
|
||||||
# value)
|
# point value)
|
||||||
#magnum_cluster_create_timeout = 1200.0
|
#magnum_cluster_create_timeout = 1200.0
|
||||||
|
|
||||||
# Time interval(in sec) between checks when waiting for cluster creation.
|
# Time interval(in sec) between checks when waiting for cluster
|
||||||
# (floating point value)
|
# creation. (floating point value)
|
||||||
#magnum_cluster_create_poll_interval = 1.0
|
#magnum_cluster_create_poll_interval = 1.0
|
||||||
|
|
||||||
# Delay between creating Manila share and polling for its status.
|
# Delay between creating Manila share and polling for its status.
|
||||||
|
@ -556,6 +556,17 @@
|
|||||||
failure_rate:
|
failure_rate:
|
||||||
max: 0
|
max: 0
|
||||||
|
|
||||||
|
-
|
||||||
|
args:
|
||||||
|
sleep: 0
|
||||||
|
runner:
|
||||||
|
type: "constant"
|
||||||
|
times: 4500
|
||||||
|
concurrency: 20
|
||||||
|
sla:
|
||||||
|
failure_rate:
|
||||||
|
max: 0
|
||||||
|
|
||||||
Dummy.dummy_exception:
|
Dummy.dummy_exception:
|
||||||
-
|
-
|
||||||
args:
|
args:
|
||||||
|
@ -257,15 +257,17 @@ def workload_create(task_uuid, subtask_uuid, key):
|
|||||||
return get_impl().workload_create(task_uuid, subtask_uuid, key)
|
return get_impl().workload_create(task_uuid, subtask_uuid, key)
|
||||||
|
|
||||||
|
|
||||||
def workload_data_create(task_uuid, workload_uuid, data):
|
def workload_data_create(task_uuid, workload_uuid, chunk_order, data):
|
||||||
"""Create a workload data.
|
"""Create a workload data.
|
||||||
|
|
||||||
:param task_uuid: string with UUID of Task instance.
|
:param task_uuid: string with UUID of Task instance.
|
||||||
:param workload_uuid: string with UUID of Workload instance.
|
:param workload_uuid: string with UUID of Workload instance.
|
||||||
|
:param chunk_order: ordinal index of workload data
|
||||||
:param data: dict with record values on the workload data.
|
:param data: dict with record values on the workload data.
|
||||||
:returns: a dict with data on the workload data.
|
:returns: a dict with data on the workload data.
|
||||||
"""
|
"""
|
||||||
return get_impl().workload_data_create(task_uuid, workload_uuid, data)
|
return get_impl().workload_data_create(task_uuid, workload_uuid,
|
||||||
|
chunk_order, data)
|
||||||
|
|
||||||
|
|
||||||
def workload_set_results(workload_uuid, data):
|
def workload_set_results(workload_uuid, data):
|
||||||
|
@ -242,7 +242,10 @@ class Connection(object):
|
|||||||
"verification_log": json.dumps(task.validation_result)
|
"verification_log": json.dumps(task.validation_result)
|
||||||
}
|
}
|
||||||
|
|
||||||
def _make_old_task_result(self, workload, workload_data):
|
def _make_old_task_result(self, workload, workload_data_list):
|
||||||
|
raw_data = [data
|
||||||
|
for workload_data in workload_data_list
|
||||||
|
for data in workload_data.chunk_data["raw"]]
|
||||||
return {
|
return {
|
||||||
"id": workload.id,
|
"id": workload.id,
|
||||||
"task_uuid": workload.task_uuid,
|
"task_uuid": workload.task_uuid,
|
||||||
@ -260,7 +263,7 @@ class Connection(object):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"data": {
|
"data": {
|
||||||
"raw": workload_data.chunk_data["raw"],
|
"raw": raw_data,
|
||||||
"load_duration": workload.load_duration,
|
"load_duration": workload.load_duration,
|
||||||
"full_duration": workload.full_duration,
|
"full_duration": workload.full_duration,
|
||||||
"sla": workload.sla_results["sla"],
|
"sla": workload.sla_results["sla"],
|
||||||
@ -268,6 +271,11 @@ class Connection(object):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def _task_workload_data_get_all(self, workload_uuid):
|
||||||
|
return (self.model_query(models.WorkloadData).
|
||||||
|
filter_by(workload_uuid=workload_uuid).
|
||||||
|
order_by(models.WorkloadData.chunk_order.asc()))
|
||||||
|
|
||||||
# @db_api.serialize
|
# @db_api.serialize
|
||||||
def task_get(self, uuid):
|
def task_get(self, uuid):
|
||||||
task = self._task_get(uuid)
|
task = self._task_get(uuid)
|
||||||
@ -407,13 +415,11 @@ class Connection(object):
|
|||||||
filter_by(task_uuid=uuid).all())
|
filter_by(task_uuid=uuid).all())
|
||||||
|
|
||||||
for workload in workloads:
|
for workload in workloads:
|
||||||
workload_data = (self.model_query(models.WorkloadData).
|
workload_data_list = self._task_workload_data_get_all(
|
||||||
filter_by(task_uuid=uuid,
|
workload.uuid)
|
||||||
workload_uuid=workload.uuid).
|
|
||||||
first())
|
|
||||||
|
|
||||||
results.append(
|
results.append(
|
||||||
self._make_old_task_result(workload, workload_data))
|
self._make_old_task_result(workload, workload_data_list))
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@ -451,7 +457,8 @@ class Connection(object):
|
|||||||
return workload
|
return workload
|
||||||
|
|
||||||
@db_api.serialize
|
@db_api.serialize
|
||||||
def workload_data_create(self, task_uuid, workload_uuid, data):
|
def workload_data_create(self, task_uuid, workload_uuid, chunk_order,
|
||||||
|
data):
|
||||||
workload_data = models.WorkloadData(task_uuid=task_uuid,
|
workload_data = models.WorkloadData(task_uuid=task_uuid,
|
||||||
workload_uuid=workload_uuid)
|
workload_uuid=workload_uuid)
|
||||||
|
|
||||||
@ -485,7 +492,7 @@ class Connection(object):
|
|||||||
workload_data.update({
|
workload_data.update({
|
||||||
"task_uuid": task_uuid,
|
"task_uuid": task_uuid,
|
||||||
"workload_uuid": workload_uuid,
|
"workload_uuid": workload_uuid,
|
||||||
"chunk_order": 0,
|
"chunk_order": chunk_order,
|
||||||
"iteration_count": iter_count,
|
"iteration_count": iter_count,
|
||||||
"failed_iteration_count": failed_iter_count,
|
"failed_iteration_count": failed_iter_count,
|
||||||
"chunk_data": {"raw": raw_data},
|
"chunk_data": {"raw": raw_data},
|
||||||
@ -503,10 +510,11 @@ class Connection(object):
|
|||||||
workload = self.model_query(models.Workload).filter_by(
|
workload = self.model_query(models.Workload).filter_by(
|
||||||
uuid=workload_uuid).first()
|
uuid=workload_uuid).first()
|
||||||
|
|
||||||
workload_data = self.model_query(models.WorkloadData).filter_by(
|
workload_data_list = self._task_workload_data_get_all(workload.uuid)
|
||||||
workload_uuid=workload_uuid).first()
|
|
||||||
|
|
||||||
raw_data = workload_data.chunk_data.get("raw", [])
|
raw_data = [raw
|
||||||
|
for workload_data in workload_data_list
|
||||||
|
for raw in workload_data.chunk_data["raw"]]
|
||||||
iter_count = len(raw_data)
|
iter_count = len(raw_data)
|
||||||
|
|
||||||
failed_iter_count = 0
|
failed_iter_count = 0
|
||||||
|
@ -569,9 +569,10 @@ class Workload(object):
|
|||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
return self.workload[key]
|
return self.workload[key]
|
||||||
|
|
||||||
def add_workload_data(self, workload_data):
|
def add_workload_data(self, chunk_order, workload_data):
|
||||||
db.workload_data_create(self.workload["task_uuid"],
|
db.workload_data_create(self.workload["task_uuid"],
|
||||||
self.workload["uuid"], workload_data)
|
self.workload["uuid"], chunk_order,
|
||||||
|
workload_data)
|
||||||
|
|
||||||
def set_results(self, data):
|
def set_results(self, data):
|
||||||
db.workload_set_results(self.workload["uuid"], data)
|
db.workload_set_results(self.workload["uuid"], data)
|
||||||
|
@ -34,13 +34,15 @@ from rally.plugins.openstack.scenarios.vm import utils as vm_utils
|
|||||||
from rally.plugins.openstack.scenarios.watcher import utils as watcher_utils
|
from rally.plugins.openstack.scenarios.watcher import utils as watcher_utils
|
||||||
from rally.plugins.openstack.verification.tempest import config as tempest_conf
|
from rally.plugins.openstack.verification.tempest import config as tempest_conf
|
||||||
from rally.plugins.openstack.wrappers import glance as glance_utils
|
from rally.plugins.openstack.wrappers import glance as glance_utils
|
||||||
|
from rally.task import engine
|
||||||
|
|
||||||
|
|
||||||
def list_opts():
|
def list_opts():
|
||||||
return [
|
return [
|
||||||
("DEFAULT",
|
("DEFAULT",
|
||||||
itertools.chain(logging.DEBUG_OPTS,
|
itertools.chain(logging.DEBUG_OPTS,
|
||||||
osclients.OSCLIENTS_OPTS)),
|
osclients.OSCLIENTS_OPTS,
|
||||||
|
engine.TASK_ENGINE_OPTS)),
|
||||||
("benchmark",
|
("benchmark",
|
||||||
itertools.chain(cinder_utils.CINDER_BENCHMARK_OPTS,
|
itertools.chain(cinder_utils.CINDER_BENCHMARK_OPTS,
|
||||||
ec2_utils.EC2_BENCHMARK_OPTS,
|
ec2_utils.EC2_BENCHMARK_OPTS,
|
||||||
|
@ -20,6 +20,7 @@ import time
|
|||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
import jsonschema
|
import jsonschema
|
||||||
|
from oslo_config import cfg
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from rally.common.i18n import _
|
from rally.common.i18n import _
|
||||||
@ -40,6 +41,14 @@ from rally.task import sla
|
|||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
TASK_ENGINE_OPTS = [
|
||||||
|
cfg.IntOpt("raw_result_chunk_size", default=1000, min=1,
|
||||||
|
help="Size of raw result chunk in iterations"),
|
||||||
|
]
|
||||||
|
CONF.register_opts(TASK_ENGINE_OPTS)
|
||||||
|
|
||||||
|
|
||||||
class ResultConsumer(object):
|
class ResultConsumer(object):
|
||||||
"""ResultConsumer class stores results from ScenarioRunner, checks SLA.
|
"""ResultConsumer class stores results from ScenarioRunner, checks SLA.
|
||||||
@ -69,6 +78,7 @@ class ResultConsumer(object):
|
|||||||
self.runner = runner
|
self.runner = runner
|
||||||
self.load_started_at = float("inf")
|
self.load_started_at = float("inf")
|
||||||
self.load_finished_at = 0
|
self.load_finished_at = 0
|
||||||
|
self.workload_data_count = 0
|
||||||
|
|
||||||
self.sla_checker = sla.SLAChecker(key["kw"])
|
self.sla_checker = sla.SLAChecker(key["kw"])
|
||||||
self.hook_executor = hook.HookExecutor(key["kw"], self.task)
|
self.hook_executor = hook.HookExecutor(key["kw"], self.task)
|
||||||
@ -109,6 +119,17 @@ class ResultConsumer(object):
|
|||||||
self.task.update_status(
|
self.task.update_status(
|
||||||
consts.TaskStatus.SOFT_ABORTING)
|
consts.TaskStatus.SOFT_ABORTING)
|
||||||
task_aborted = True
|
task_aborted = True
|
||||||
|
|
||||||
|
# save results chunks
|
||||||
|
chunk_size = CONF.raw_result_chunk_size
|
||||||
|
while len(self.results) >= chunk_size:
|
||||||
|
results_chunk = self.results[:chunk_size]
|
||||||
|
self.results = self.results[chunk_size:]
|
||||||
|
results_chunk.sort(key=lambda x: x["timestamp"])
|
||||||
|
self.workload.add_workload_data(self.workload_data_count,
|
||||||
|
{"raw": results_chunk})
|
||||||
|
self.workload_data_count += 1
|
||||||
|
|
||||||
elif self.is_done.isSet():
|
elif self.is_done.isSet():
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
@ -136,9 +157,6 @@ class ResultConsumer(object):
|
|||||||
self.task["uuid"]) == consts.TaskStatus.ABORTED:
|
self.task["uuid"]) == consts.TaskStatus.ABORTED:
|
||||||
self.sla_checker.set_aborted_manually()
|
self.sla_checker.set_aborted_manually()
|
||||||
|
|
||||||
# NOTE(boris-42): Sort in order of starting instead of order of ending
|
|
||||||
self.results.sort(key=lambda x: x["timestamp"])
|
|
||||||
|
|
||||||
load_duration = max(self.load_finished_at - self.load_started_at, 0)
|
load_duration = max(self.load_finished_at - self.load_started_at, 0)
|
||||||
|
|
||||||
LOG.info("Load duration is: %s" % utils.format_float_to_str(
|
LOG.info("Load duration is: %s" % utils.format_float_to_str(
|
||||||
@ -153,12 +171,17 @@ class ResultConsumer(object):
|
|||||||
"full_duration": self.finish - self.start,
|
"full_duration": self.finish - self.start,
|
||||||
"sla": self.sla_checker.results(),
|
"sla": self.sla_checker.results(),
|
||||||
}
|
}
|
||||||
self.runner.send_event(type="load_finished", value=results)
|
|
||||||
if "hooks" in self.key["kw"]:
|
if "hooks" in self.key["kw"]:
|
||||||
self.event_thread.join()
|
self.event_thread.join()
|
||||||
results["hooks"] = self.hook_executor.results()
|
results["hooks"] = self.hook_executor.results()
|
||||||
|
|
||||||
self.workload.add_workload_data({"raw": self.results})
|
if self.results:
|
||||||
|
# NOTE(boris-42): Sort in order of starting
|
||||||
|
# instead of order of ending
|
||||||
|
self.results.sort(key=lambda x: x["timestamp"])
|
||||||
|
self.workload.add_workload_data(self.workload_data_count,
|
||||||
|
{"raw": self.results})
|
||||||
|
|
||||||
self.workload.set_results(results)
|
self.workload.set_results(results)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -254,7 +254,7 @@ class TasksTestCase(test.DBTestCase):
|
|||||||
}
|
}
|
||||||
subtask = db.subtask_create(task_id, title="foo")
|
subtask = db.subtask_create(task_id, title="foo")
|
||||||
workload = db.workload_create(task_id, subtask["uuid"], key)
|
workload = db.workload_create(task_id, subtask["uuid"], key)
|
||||||
db.workload_data_create(task_id, workload["uuid"], {"raw": []})
|
db.workload_data_create(task_id, workload["uuid"], 0, {"raw": []})
|
||||||
db.workload_set_results(workload["uuid"], data)
|
db.workload_set_results(workload["uuid"], data)
|
||||||
|
|
||||||
res = db.task_result_get_all_by_uuid(task_id)
|
res = db.task_result_get_all_by_uuid(task_id)
|
||||||
@ -311,7 +311,7 @@ class TasksTestCase(test.DBTestCase):
|
|||||||
data["sla"][0] = {"success": True}
|
data["sla"][0] = {"success": True}
|
||||||
subtask = db.subtask_create(task_id, title="foo")
|
subtask = db.subtask_create(task_id, title="foo")
|
||||||
workload = db.workload_create(task_id, subtask["uuid"], key)
|
workload = db.workload_create(task_id, subtask["uuid"], key)
|
||||||
db.workload_data_create(task_id, workload["uuid"], {"raw": []})
|
db.workload_data_create(task_id, workload["uuid"], 0, {"raw": []})
|
||||||
db.workload_set_results(workload["uuid"], data)
|
db.workload_set_results(workload["uuid"], data)
|
||||||
|
|
||||||
for task_id in (task1, task2):
|
for task_id in (task1, task2):
|
||||||
@ -355,7 +355,8 @@ class TasksTestCase(test.DBTestCase):
|
|||||||
|
|
||||||
subtask = db.subtask_create(task1["uuid"], title="foo")
|
subtask = db.subtask_create(task1["uuid"], title="foo")
|
||||||
workload = db.workload_create(task1["uuid"], subtask["uuid"], key)
|
workload = db.workload_create(task1["uuid"], subtask["uuid"], key)
|
||||||
db.workload_data_create(task1["uuid"], workload["uuid"], {"raw": []})
|
db.workload_data_create(
|
||||||
|
task1["uuid"], workload["uuid"], 0, {"raw": []})
|
||||||
db.workload_set_results(workload["uuid"], data)
|
db.workload_set_results(workload["uuid"], data)
|
||||||
|
|
||||||
task1_full = db.task_get_detailed(task1["uuid"])
|
task1_full = db.task_get_detailed(task1["uuid"])
|
||||||
@ -403,7 +404,8 @@ class TasksTestCase(test.DBTestCase):
|
|||||||
|
|
||||||
subtask = db.subtask_create(task1["uuid"], title="foo")
|
subtask = db.subtask_create(task1["uuid"], title="foo")
|
||||||
workload = db.workload_create(task1["uuid"], subtask["uuid"], key)
|
workload = db.workload_create(task1["uuid"], subtask["uuid"], key)
|
||||||
db.workload_data_create(task1["uuid"], workload["uuid"], {"raw": []})
|
db.workload_data_create(
|
||||||
|
task1["uuid"], workload["uuid"], 0, {"raw": []})
|
||||||
db.workload_set_results(workload["uuid"], data)
|
db.workload_set_results(workload["uuid"], data)
|
||||||
|
|
||||||
task1_full = db.task_get_detailed_last()
|
task1_full = db.task_get_detailed_last()
|
||||||
@ -463,7 +465,7 @@ class TasksTestCase(test.DBTestCase):
|
|||||||
|
|
||||||
subtask = db.subtask_create(task_id, title="foo")
|
subtask = db.subtask_create(task_id, title="foo")
|
||||||
workload = db.workload_create(task_id, subtask["uuid"], key)
|
workload = db.workload_create(task_id, subtask["uuid"], key)
|
||||||
db.workload_data_create(task_id, workload["uuid"], raw_data)
|
db.workload_data_create(task_id, workload["uuid"], 0, raw_data)
|
||||||
db.workload_set_results(workload["uuid"], data)
|
db.workload_set_results(workload["uuid"], data)
|
||||||
|
|
||||||
res = db.task_result_get_all_by_uuid(task_id)
|
res = db.task_result_get_all_by_uuid(task_id)
|
||||||
@ -471,6 +473,80 @@ class TasksTestCase(test.DBTestCase):
|
|||||||
self.assertEqual(raw_data["raw"], res[0]["data"]["raw"])
|
self.assertEqual(raw_data["raw"], res[0]["data"]["raw"])
|
||||||
self.assertEqual(key, res[0]["key"])
|
self.assertEqual(key, res[0]["key"])
|
||||||
|
|
||||||
|
def test_task_multiple_raw_result_create(self):
|
||||||
|
task_id = self._create_task()["uuid"]
|
||||||
|
key = {
|
||||||
|
"name": "atata",
|
||||||
|
"pos": 0,
|
||||||
|
"kw": {
|
||||||
|
"args": {"a": "A"},
|
||||||
|
"context": {"c": "C"},
|
||||||
|
"sla": {"s": "S"},
|
||||||
|
"runner": {"r": "R", "type": "T"},
|
||||||
|
"hooks": [],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
subtask = db.subtask_create(task_id, title="foo")
|
||||||
|
workload = db.workload_create(task_id, subtask["uuid"], key)
|
||||||
|
|
||||||
|
db.workload_data_create(task_id, workload["uuid"], 0, {
|
||||||
|
"raw": [
|
||||||
|
{"error": "anError", "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 1, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 2, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 3, "timestamp": 10, "duration": 1},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
db.workload_data_create(task_id, workload["uuid"], 1, {
|
||||||
|
"raw": [
|
||||||
|
{"error": "anError2", "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 6, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 5, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 4, "timestamp": 10, "duration": 1},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
db.workload_data_create(task_id, workload["uuid"], 2, {
|
||||||
|
"raw": [
|
||||||
|
{"duration": 7, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 8, "timestamp": 10, "duration": 1},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
db.workload_set_results(workload["uuid"], {
|
||||||
|
"sla": [{"success": True}],
|
||||||
|
"load_duration": 13,
|
||||||
|
"full_duration": 42
|
||||||
|
})
|
||||||
|
|
||||||
|
res = db.task_result_get_all_by_uuid(task_id)
|
||||||
|
self.assertEqual(len(res), 1)
|
||||||
|
self.assertEqual(res[0]["key"], key)
|
||||||
|
self.assertEqual(res[0]["data"], {
|
||||||
|
"raw": [
|
||||||
|
{"error": "anError", "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 1, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 2, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 3, "timestamp": 10, "duration": 1},
|
||||||
|
{"error": "anError2", "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 6, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 5, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 4, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 7, "timestamp": 10, "duration": 1},
|
||||||
|
{"duration": 8, "timestamp": 10, "duration": 1},
|
||||||
|
],
|
||||||
|
"sla": [{"success": True}],
|
||||||
|
"hooks": [],
|
||||||
|
"load_duration": 13,
|
||||||
|
"full_duration": 42
|
||||||
|
})
|
||||||
|
|
||||||
|
db.task_delete(task_id)
|
||||||
|
res = db.task_result_get_all_by_uuid(task_id)
|
||||||
|
self.assertEqual(len(res), 0)
|
||||||
|
|
||||||
|
|
||||||
class SubtaskTestCase(test.DBTestCase):
|
class SubtaskTestCase(test.DBTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -544,7 +620,7 @@ class WorkloadTestCase(test.DBTestCase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
workload = db.workload_create(self.task_uuid, self.subtask_uuid, key)
|
workload = db.workload_create(self.task_uuid, self.subtask_uuid, key)
|
||||||
db.workload_data_create(self.task_uuid, workload["uuid"], raw_data)
|
db.workload_data_create(self.task_uuid, workload["uuid"], 0, raw_data)
|
||||||
workload = db.workload_set_results(workload["uuid"], data)
|
workload = db.workload_set_results(workload["uuid"], data)
|
||||||
self.assertEqual("atata", workload["name"])
|
self.assertEqual("atata", workload["name"])
|
||||||
self.assertEqual(0, workload["position"])
|
self.assertEqual(0, workload["position"])
|
||||||
@ -576,7 +652,6 @@ class WorkloadTestCase(test.DBTestCase):
|
|||||||
"runner": {"r": "R", "type": "T"}
|
"runner": {"r": "R", "type": "T"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
raw_data = {"raw": []}
|
|
||||||
data = {
|
data = {
|
||||||
"sla": [
|
"sla": [
|
||||||
{"s": "S", "success": False},
|
{"s": "S", "success": False},
|
||||||
@ -588,7 +663,6 @@ class WorkloadTestCase(test.DBTestCase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
workload = db.workload_create(self.task_uuid, self.subtask_uuid, key)
|
workload = db.workload_create(self.task_uuid, self.subtask_uuid, key)
|
||||||
db.workload_data_create(self.task_uuid, workload["uuid"], raw_data)
|
|
||||||
workload = db.workload_set_results(workload["uuid"], data)
|
workload = db.workload_set_results(workload["uuid"], data)
|
||||||
self.assertEqual("atata", workload["name"])
|
self.assertEqual("atata", workload["name"])
|
||||||
self.assertEqual(0, workload["position"])
|
self.assertEqual(0, workload["position"])
|
||||||
@ -633,7 +707,7 @@ class WorkloadDataTestCase(test.DBTestCase):
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
workload_data = db.workload_data_create(self.task_uuid,
|
workload_data = db.workload_data_create(self.task_uuid,
|
||||||
self.workload_uuid, data)
|
self.workload_uuid, 0, data)
|
||||||
self.assertEqual(3, workload_data["iteration_count"])
|
self.assertEqual(3, workload_data["iteration_count"])
|
||||||
self.assertEqual(1, workload_data["failed_iteration_count"])
|
self.assertEqual(1, workload_data["failed_iteration_count"])
|
||||||
self.assertEqual(dt.datetime.fromtimestamp(1),
|
self.assertEqual(dt.datetime.fromtimestamp(1),
|
||||||
@ -649,7 +723,7 @@ class WorkloadDataTestCase(test.DBTestCase):
|
|||||||
mock_time.return_value = 10
|
mock_time.return_value = 10
|
||||||
data = {"raw": []}
|
data = {"raw": []}
|
||||||
workload_data = db.workload_data_create(self.task_uuid,
|
workload_data = db.workload_data_create(self.task_uuid,
|
||||||
self.workload_uuid, data)
|
self.workload_uuid, 0, data)
|
||||||
self.assertEqual(0, workload_data["iteration_count"])
|
self.assertEqual(0, workload_data["iteration_count"])
|
||||||
self.assertEqual(0, workload_data["failed_iteration_count"])
|
self.assertEqual(0, workload_data["failed_iteration_count"])
|
||||||
self.assertEqual(dt.datetime.fromtimestamp(10),
|
self.assertEqual(dt.datetime.fromtimestamp(10),
|
||||||
|
@ -385,9 +385,10 @@ class WorkloadTestCase(test.TestCase):
|
|||||||
mock_workload_create.return_value = self.workload
|
mock_workload_create.return_value = self.workload
|
||||||
workload = objects.Workload("uuid1", "uuid2", {"bar": "baz"})
|
workload = objects.Workload("uuid1", "uuid2", {"bar": "baz"})
|
||||||
|
|
||||||
workload = workload.add_workload_data({"data": "foo"})
|
workload = workload.add_workload_data(0, {"data": "foo"})
|
||||||
mock_workload_data_create.assert_called_once_with(
|
mock_workload_data_create.assert_called_once_with(
|
||||||
self.workload["task_uuid"], self.workload["uuid"], {"data": "foo"})
|
self.workload["task_uuid"], self.workload["uuid"],
|
||||||
|
0, {"data": "foo"})
|
||||||
|
|
||||||
@mock.patch("rally.common.objects.task.db.workload_set_results")
|
@mock.patch("rally.common.objects.task.db.workload_set_results")
|
||||||
@mock.patch("rally.common.objects.task.db.workload_create")
|
@mock.patch("rally.common.objects.task.db.workload_create")
|
||||||
|
@ -550,7 +550,7 @@ class ResultConsumerTestCase(test.TestCase):
|
|||||||
key, task, subtask, workload, runner, False):
|
key, task, subtask, workload, runner, False):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
workload.add_workload_data.assert_called_once_with({"raw": []})
|
self.assertFalse(workload.add_workload_data.called)
|
||||||
workload.set_results.assert_called_once_with({
|
workload.set_results.assert_called_once_with({
|
||||||
"full_duration": 1,
|
"full_duration": 1,
|
||||||
"sla": mock_sla_results,
|
"sla": mock_sla_results,
|
||||||
@ -671,6 +671,60 @@ class ResultConsumerTestCase(test.TestCase):
|
|||||||
mock_sla_instance.set_unexpected_failure.assert_has_calls(
|
mock_sla_instance.set_unexpected_failure.assert_has_calls(
|
||||||
[mock.call(exc)])
|
[mock.call(exc)])
|
||||||
|
|
||||||
|
@mock.patch("rally.task.engine.CONF")
|
||||||
|
@mock.patch("rally.common.objects.Task.get_status")
|
||||||
|
@mock.patch("rally.task.engine.ResultConsumer.wait_and_abort")
|
||||||
|
@mock.patch("rally.task.sla.SLAChecker")
|
||||||
|
def test_consume_results_chunked(
|
||||||
|
self, mock_sla_checker, mock_result_consumer_wait_and_abort,
|
||||||
|
mock_task_get_status, mock_conf):
|
||||||
|
mock_conf.raw_result_chunk_size = 2
|
||||||
|
mock_sla_instance = mock.MagicMock()
|
||||||
|
mock_sla_checker.return_value = mock_sla_instance
|
||||||
|
mock_task_get_status.return_value = consts.TaskStatus.RUNNING
|
||||||
|
key = {"kw": {"fake": 2}, "name": "fake", "pos": 0}
|
||||||
|
task = mock.MagicMock(spec=objects.Task)
|
||||||
|
subtask = mock.Mock(spec=objects.Subtask)
|
||||||
|
workload = mock.Mock(spec=objects.Workload)
|
||||||
|
runner = mock.MagicMock()
|
||||||
|
|
||||||
|
results = [
|
||||||
|
[{"duration": 1, "timestamp": 3},
|
||||||
|
{"duration": 2, "timestamp": 2},
|
||||||
|
{"duration": 3, "timestamp": 3}],
|
||||||
|
[{"duration": 4, "timestamp": 2},
|
||||||
|
{"duration": 5, "timestamp": 3}],
|
||||||
|
[{"duration": 6, "timestamp": 2}],
|
||||||
|
[{"duration": 7, "timestamp": 1}],
|
||||||
|
]
|
||||||
|
|
||||||
|
runner.result_queue = collections.deque(results)
|
||||||
|
runner.event_queue = collections.deque()
|
||||||
|
with engine.ResultConsumer(
|
||||||
|
key, task, subtask, workload, runner, False) as consumer_obj:
|
||||||
|
pass
|
||||||
|
|
||||||
|
mock_sla_instance.add_iteration.assert_has_calls([
|
||||||
|
mock.call({"duration": 1, "timestamp": 3}),
|
||||||
|
mock.call({"duration": 2, "timestamp": 2}),
|
||||||
|
mock.call({"duration": 3, "timestamp": 3}),
|
||||||
|
mock.call({"duration": 4, "timestamp": 2}),
|
||||||
|
mock.call({"duration": 5, "timestamp": 3}),
|
||||||
|
mock.call({"duration": 6, "timestamp": 2}),
|
||||||
|
mock.call({"duration": 7, "timestamp": 1})])
|
||||||
|
|
||||||
|
self.assertEqual([{"duration": 7, "timestamp": 1}],
|
||||||
|
consumer_obj.results)
|
||||||
|
|
||||||
|
workload.add_workload_data.assert_has_calls([
|
||||||
|
mock.call(0, {"raw": [{"duration": 2, "timestamp": 2},
|
||||||
|
{"duration": 1, "timestamp": 3}]}),
|
||||||
|
mock.call(1, {"raw": [{"duration": 4, "timestamp": 2},
|
||||||
|
{"duration": 3, "timestamp": 3}]}),
|
||||||
|
mock.call(2, {"raw": [{"duration": 6, "timestamp": 2},
|
||||||
|
{"duration": 5, "timestamp": 3}]}),
|
||||||
|
mock.call(3, {"raw": [{"duration": 7, "timestamp": 1}]})])
|
||||||
|
|
||||||
@mock.patch("rally.task.engine.LOG")
|
@mock.patch("rally.task.engine.LOG")
|
||||||
@mock.patch("rally.task.hook.HookExecutor")
|
@mock.patch("rally.task.hook.HookExecutor")
|
||||||
@mock.patch("rally.task.engine.time.time")
|
@mock.patch("rally.task.engine.time.time")
|
||||||
@ -719,7 +773,7 @@ class ResultConsumerTestCase(test.TestCase):
|
|||||||
mock.call(event_type="iteration", value=3)
|
mock.call(event_type="iteration", value=3)
|
||||||
])
|
])
|
||||||
|
|
||||||
workload.add_workload_data.assert_called_once_with({"raw": []})
|
self.assertFalse(workload.add_workload_data.called)
|
||||||
workload.set_results.assert_called_once_with({
|
workload.set_results.assert_called_once_with({
|
||||||
"full_duration": 1,
|
"full_duration": 1,
|
||||||
"sla": mock_sla_results,
|
"sla": mock_sla_results,
|
||||||
|
Loading…
Reference in New Issue
Block a user