Move task results loader helper to a separate module

Change-Id: I97e7e2bd6fd79f1209ca027349d0489729550500
This commit is contained in:
Andrey Kurilin 2020-03-22 20:17:45 +02:00
parent 64c133a5b6
commit 6611305108
7 changed files with 510 additions and 515 deletions

View File

@ -16,17 +16,15 @@
"""Rally command: task"""
from __future__ import print_function
import datetime as dt
import itertools
import json
import os
import sys
import webbrowser
import jsonschema
from rally.cli import cliutils
from rally.cli import envutils
from rally.cli import task_results_loader
from rally.cli import yamlutils as yaml
from rally.common import logging
from rally.common import utils as rutils
@ -36,98 +34,17 @@ from rally import exceptions
from rally import plugins
from rally.task import atomic
from rally.task.processing import charts
from rally.task import utils as tutils
from rally.utils import strutils
LOG = logging.getLogger(__name__)
OLD_TASK_RESULT_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"key": {
"type": "object",
"properties": {
"kw": {
"type": "object"
},
"name": {
"type": "string"
},
"pos": {
"type": "integer"
},
},
"required": ["kw", "name", "pos"]
},
"sla": {
"type": "array",
"items": {
"type": "object",
"properties": {
"criterion": {
"type": "string"
},
"detail": {
"type": "string"
},
"success": {
"type": "boolean"
}
}
}
},
"hooks": {"type": "array"},
"result": {
"type": "array",
"items": {
"type": "object",
"properties": {
"atomic_actions": {
"type": "object"
},
"duration": {
"type": "number"
},
"error": {
"type": "array"
},
"idle_duration": {
"type": "number"
},
"output": {"type": "object"}
},
"required": ["atomic_actions", "duration", "error",
"idle_duration"]
},
"minItems": 0
},
"load_duration": {
"type": "number",
},
"full_duration": {
"type": "number",
},
"created_at": {
"type": "string"
}
},
"required": ["key", "sla", "result", "load_duration", "full_duration"],
"additionalProperties": False
}
class FailedToLoadTask(exceptions.RallyException):
error_code = 117
msg_fmt = "Invalid %(source)s passed:\n\n\t %(msg)s"
class FailedToLoadResults(exceptions.RallyException):
error_code = 225
msg_fmt = "ERROR: Invalid task result format in %(source)s\n\n\t%(msg)s"
class TaskCommands(object):
"""Set of commands that allow you to manage tasks and results.
@ -625,137 +542,6 @@ class TaskCommands(object):
print("There are no tasks. To run a new task, use:\n"
"\trally task start")
def _load_task_results_file(self, api, task_id):
"""Load the json file which is created by `rally task results`"""
with open(os.path.expanduser(task_id)) as inp_js:
tasks_results = json.loads(inp_js.read())
if isinstance(tasks_results, list):
# it is an old format:
task = {"version": 2,
"title": "Task loaded from a file.",
"description": "Auto-ported from task format V1.",
"uuid": "n/a",
"env_name": "n/a",
"env_uuid": "n/a",
"tags": [],
"status": consts.TaskStatus.FINISHED,
"subtasks": []}
start_time = None
for result in tasks_results:
try:
jsonschema.validate(
result, OLD_TASK_RESULT_SCHEMA)
except jsonschema.ValidationError as e:
raise FailedToLoadResults(source=task_id,
msg=str(e))
iter_count = 0
failed_iter_count = 0
min_duration = None
max_duration = None
for itr in result["result"]:
if start_time is None or itr["timestamp"] < start_time:
start_time = itr["timestamp"]
# NOTE(chenhb): back compatible for atomic_actions
itr["atomic_actions"] = list(
tutils.WrapperForAtomicActions(itr["atomic_actions"],
itr["timestamp"]))
iter_count += 1
if itr.get("error"):
failed_iter_count += 1
duration = itr.get("duration", 0)
if max_duration is None or duration > max_duration:
max_duration = duration
if min_duration is None or min_duration > duration:
min_duration = duration
durations_stat = charts.MainStatsTable(
{"total_iteration_count": iter_count})
for itr in result["result"]:
durations_stat.add_iteration(itr)
created_at = dt.datetime.strptime(result["created_at"],
"%Y-%d-%mT%H:%M:%S")
updated_at = created_at + dt.timedelta(
seconds=result["full_duration"])
created_at = created_at.strftime(consts.TimeFormat.ISO8601)
updated_at = updated_at.strftime(consts.TimeFormat.ISO8601)
pass_sla = all(s.get("success") for s in result["sla"])
runner_type = result["key"]["kw"]["runner"].pop("type")
for h in result["hooks"]:
trigger = h["config"]["trigger"]
h["config"] = {
"description": h["config"].get("description"),
"action": (h["config"]["name"], h["config"]["args"]),
"trigger": (trigger["name"], trigger["args"])}
workload = {"uuid": "n/a",
"name": result["key"]["name"],
"position": result["key"]["pos"],
"description": result["key"].get("description",
""),
"full_duration": result["full_duration"],
"load_duration": result["load_duration"],
"total_iteration_count": iter_count,
"failed_iteration_count": failed_iter_count,
"min_duration": min_duration,
"max_duration": max_duration,
"start_time": start_time,
"created_at": created_at,
"updated_at": updated_at,
"args": result["key"]["kw"]["args"],
"runner_type": runner_type,
"runner": result["key"]["kw"]["runner"],
"hooks": result["hooks"],
"sla": result["key"]["kw"]["sla"],
"sla_results": {"sla": result["sla"]},
"pass_sla": pass_sla,
"contexts": result["key"]["kw"]["context"],
"contexts_results": [],
"data": sorted(result["result"],
key=lambda x: x["timestamp"]),
"statistics": {
"durations": durations_stat.to_dict()},
}
task["subtasks"].append(
{"title": "A SubTask",
"description": "",
"workloads": [workload]})
return [task]
elif isinstance(tasks_results, dict) and "tasks" in tasks_results:
for task_result in tasks_results["tasks"]:
try:
jsonschema.validate(task_result,
api.task.TASK_SCHEMA)
except jsonschema.ValidationError as e:
msg = str(e)
raise exceptions.RallyException(
"ERROR: Invalid task result format\n\n\t%s" % msg)
task_result.setdefault("env_name", "n/a")
task_result.setdefault("env_uuid", "n/a")
for subtask in task_result["subtasks"]:
for workload in subtask["workloads"]:
workload.setdefault("contexts_results", [])
workload["runner_type"], workload["runner"] = list(
workload["runner"].items())[0]
workload["name"], workload["args"] = list(
workload.pop("scenario").items())[0]
return tasks_results["tasks"]
else:
raise FailedToLoadResults(
source=task_id, msg="Wrong format")
@cliutils.args("--out", metavar="<path>",
type=str, dest="out", required=False,
help="Path to output file.")
@ -907,7 +693,7 @@ class TaskCommands(object):
for task_file_or_uuid in tasks:
if os.path.exists(os.path.expanduser(task_file_or_uuid)):
exported_tasks.extend(
self._load_task_results_file(api, task_file_or_uuid)
task_results_loader.load(task_file_or_uuid)
)
else:
exported_tasks.append(task_file_or_uuid)
@ -964,7 +750,7 @@ class TaskCommands(object):
"""Import json results of a test into rally database"""
if os.path.exists(os.path.expanduser(task_file)):
tasks_results = self._load_task_results_file(api, task_file)
tasks_results = task_results_loader.load(task_file)
for task_results in tasks_results:
task = api.task.import_results(deployment=deployment,
task_results=task_results,

View File

@ -0,0 +1,258 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime as dt
import json
import os
import jsonschema
from rally import api
from rally import consts
from rally import exceptions
from rally.task.processing import charts
OLD_TASK_RESULT_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"key": {
"type": "object",
"properties": {
"kw": {
"type": "object"
},
"name": {
"type": "string"
},
"pos": {
"type": "integer"
},
},
"required": ["kw", "name", "pos"]
},
"sla": {
"type": "array",
"items": {
"type": "object",
"properties": {
"criterion": {
"type": "string"
},
"detail": {
"type": "string"
},
"success": {
"type": "boolean"
}
}
}
},
"hooks": {"type": "array"},
"result": {
"type": "array",
"items": {
"type": "object",
"properties": {
"atomic_actions": {
"type": "object"
},
"duration": {
"type": "number"
},
"error": {
"type": "array"
},
"idle_duration": {
"type": "number"
},
"output": {"type": "object"}
},
"required": ["atomic_actions", "duration", "error",
"idle_duration"]
},
"minItems": 0
},
"load_duration": {
"type": "number",
},
"full_duration": {
"type": "number",
},
"created_at": {
"type": "string"
}
},
"required": ["key", "sla", "result", "load_duration", "full_duration"],
"additionalProperties": False
}
class FailedToLoadResults(exceptions.RallyException):
error_code = 225
msg_fmt = "ERROR: Invalid task result format in %(source)s\n\n\t%(msg)s"
def _update_atomic_actions(atomic_actions, started_at):
"""Convert atomic actions in old format to latest one."""
new = []
for name, duration in atomic_actions.items():
finished_at = started_at + duration
new.append({
"name": name,
"started_at": started_at,
"finished_at": finished_at,
"children": []}
)
started_at = finished_at
return new
def _update_old_results(tasks_results, path):
"""Converts tasks results in old format to latest one."""
task = {"version": 2,
"title": "Task loaded from a file.",
"description": "Auto-ported from task format V1.",
"uuid": "n/a",
"env_name": "n/a",
"env_uuid": "n/a",
"tags": [],
"status": consts.TaskStatus.FINISHED,
"subtasks": []}
start_time = None
for result in tasks_results:
try:
jsonschema.validate(
result, OLD_TASK_RESULT_SCHEMA)
except jsonschema.ValidationError as e:
raise FailedToLoadResults(source=path,
msg=str(e))
iter_count = 0
failed_iter_count = 0
min_duration = None
max_duration = None
for itr in result["result"]:
if start_time is None or itr["timestamp"] < start_time:
start_time = itr["timestamp"]
# NOTE(chenhb): back compatible for atomic_actions
itr["atomic_actions"] = _update_atomic_actions(
itr["atomic_actions"], started_at=itr["timestamp"])
iter_count += 1
if itr.get("error"):
failed_iter_count += 1
duration = itr.get("duration", 0)
if max_duration is None or duration > max_duration:
max_duration = duration
if min_duration is None or min_duration > duration:
min_duration = duration
durations_stat = charts.MainStatsTable(
{"total_iteration_count": iter_count})
for itr in result["result"]:
durations_stat.add_iteration(itr)
created_at = dt.datetime.strptime(result["created_at"],
"%Y-%d-%mT%H:%M:%S")
updated_at = created_at + dt.timedelta(
seconds=result["full_duration"])
created_at = created_at.strftime(consts.TimeFormat.ISO8601)
updated_at = updated_at.strftime(consts.TimeFormat.ISO8601)
pass_sla = all(s.get("success") for s in result["sla"])
runner_type = result["key"]["kw"]["runner"].pop("type")
for h in result["hooks"]:
trigger = h["config"]["trigger"]
h["config"] = {
"description": h["config"].get("description"),
"action": (h["config"]["name"], h["config"]["args"]),
"trigger": (trigger["name"], trigger["args"])}
workload = {"uuid": "n/a",
"name": result["key"]["name"],
"position": result["key"]["pos"],
"description": result["key"].get("description",
""),
"full_duration": result["full_duration"],
"load_duration": result["load_duration"],
"total_iteration_count": iter_count,
"failed_iteration_count": failed_iter_count,
"min_duration": min_duration,
"max_duration": max_duration,
"start_time": start_time,
"created_at": created_at,
"updated_at": updated_at,
"args": result["key"]["kw"]["args"],
"runner_type": runner_type,
"runner": result["key"]["kw"]["runner"],
"hooks": result["hooks"],
"sla": result["key"]["kw"]["sla"],
"sla_results": {"sla": result["sla"]},
"pass_sla": pass_sla,
"contexts": result["key"]["kw"]["context"],
"contexts_results": [],
"data": sorted(result["result"],
key=lambda x: x["timestamp"]),
"statistics": {
"durations": durations_stat.to_dict()},
}
task["subtasks"].append(
{"title": "A SubTask",
"description": "",
"workloads": [workload]})
return [task]
def _update_new_results(tasks_results):
for task_result in tasks_results["tasks"]:
try:
jsonschema.validate(task_result, api._Task.TASK_SCHEMA)
except jsonschema.ValidationError as e:
raise exceptions.RallyException(
"ERROR: Invalid task result format\n\n\t%s" % str(e)) from None
task_result.setdefault("env_name", "n/a")
task_result.setdefault("env_uuid", "n/a")
for subtask in task_result["subtasks"]:
for workload in subtask["workloads"]:
workload.setdefault("contexts_results", [])
workload["runner_type"], workload["runner"] = list(
workload["runner"].items())[0]
workload["name"], workload["args"] = list(
workload.pop("scenario").items())[0]
return tasks_results["tasks"]
def load(path):
with open(os.path.expanduser(path)) as f:
raw_tasks_results = f.read()
try:
tasks_results = json.loads(raw_tasks_results)
except ValueError:
raise FailedToLoadResults(
source=path, msg="error while loading JSON.") from None
if isinstance(tasks_results, list):
return _update_old_results(tasks_results, path)
elif isinstance(tasks_results, dict) and "tasks" in tasks_results:
return _update_new_results(tasks_results)
else:
raise FailedToLoadResults(
source=path, msg="Wrong format")

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import itertools
import time
import traceback
@ -401,70 +400,3 @@ class ActionBuilder(object):
self._build(binding["action"], times,
*(binding["args"] + args), **dft_kwargs))
return bound_actions
# TODO(andreykurilin): We need to implement some wrapper for atomic actions,
# we can use these wrapper to simulate new and old format.
class WrapperForAtomicActions(list):
def __init__(self, atomic_actions, timestamp=0):
self.timestamp = timestamp
if isinstance(atomic_actions, list):
self.__atomic_actions = atomic_actions
self.__old_atomic_actions = self._convert_new_atomic_actions(
self.__atomic_actions)
else:
self.__atomic_actions = self._convert_old_atomic_actions(
atomic_actions)
self.__old_atomic_actions = atomic_actions
super(WrapperForAtomicActions, self).__init__(self.__atomic_actions)
def _convert_old_atomic_actions(self, old_atomic_actions):
"""Convert atomic actions to new format. """
atomic_actions = []
started_at = self.timestamp
for name, duration in old_atomic_actions.items():
finished_at = started_at + duration
atomic_actions.append({"name": name,
"started_at": started_at,
"finished_at": finished_at,
"children": []})
started_at = finished_at
return atomic_actions
def _convert_new_atomic_actions(self, atomic_actions):
"""Convert atomic actions to old format. """
old_style = collections.OrderedDict()
for action in atomic_actions:
duration = action["finished_at"] - action["started_at"]
if action["name"] in old_style:
name_template = action["name"] + " (%i)"
i = 2
while name_template % i in old_style:
i += 1
old_style[name_template % i] = duration
else:
old_style[action["name"]] = duration
return old_style
def items(self):
return self.__old_atomic_actions.items()
def get(self, name, default=None):
return self.__old_atomic_actions.get(name, default)
def __iter__(self):
return iter(self.__atomic_actions)
def __len__(self):
return len(self.__atomic_actions)
def __getitem__(self, item):
if isinstance(item, int):
# it is a call to list:
return self.__atomic_actions[item]
else:
return self.__old_atomic_actions[item]

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
ALLOWED_EXTRA_MISSING=40
ALLOWED_EXTRA_MISSING=4
show_diff () {
head -1 $1

View File

@ -896,12 +896,13 @@ class TaskCommandsTestCase(test.TestCase):
self.assertRaises(exceptions.DBRecordNotFound, self.task.use,
self.fake_api, task_id)
@mock.patch("rally.cli.task_results_loader.load")
@mock.patch("rally.cli.commands.task.os.path")
@mock.patch("rally.cli.commands.task.webbrowser.open_new_tab")
@mock.patch("rally.cli.commands.task.open", create=True)
@mock.patch("rally.cli.commands.task.print")
def test_export(self, mock_print, mock_open, mock_open_new_tab,
mock_path):
mock_path, mock_load):
# file
self.fake_api.task.export.return_value = {
@ -911,15 +912,14 @@ class TaskCommandsTestCase(test.TestCase):
mock_path.realpath.return_value = "real_path"
mock_fd = mock.mock_open()
mock_open.side_effect = mock_fd
self.task._load_task_results_file = mock.MagicMock(
return_value=[{"task": "task_1"}, {"task": "task2"}])
mock_load.return_value = [{"task": "task_1"}, {"task": "task2"}]
self.task.export(self.fake_api, tasks=["uuid", "file"],
output_type="json", output_dest="output_dest",
open_it=True)
self.fake_api.task.export.assert_called_once_with(
tasks=["uuid"] + self.task._load_task_results_file.return_value,
tasks=["uuid"] + mock_load.return_value,
output_type="json",
output_dest="output_dest"
)
@ -1034,160 +1034,18 @@ class TaskCommandsTestCase(test.TestCase):
mock.call(error_traceback or "No traceback available.")
], any_order=False)
@mock.patch("rally.cli.commands.task.open", create=True)
@mock.patch("rally.cli.commands.task.json.loads")
@mock.patch("rally.cli.commands.task.jsonschema.validate",
return_value=None)
def test__load_task_results_file(self, mock_validate, mock_loads,
mock_open):
task_file = "/tmp/task.json"
workload = {
"uuid": "n/a",
"full_duration": 2, "load_duration": 1,
"created_at": "2017-07-01T07:03:01",
"updated_at": "2017-07-01T07:03:03",
"total_iteration_count": 2,
"failed_iteration_count": 1,
"min_duration": 3,
"max_duration": 5,
"start_time": 1,
"name": "Foo.bar", "description": "descr",
"position": 2,
"args": {"key1": "value1"},
"runner_type": "constant",
"runner": {"time": 3},
"hooks": [{"config": {
"description": "descr",
"action": ("foo", {"arg1": "v1"}),
"trigger": ("t", {"a2", "v2"})}}],
"pass_sla": True,
"sla": {"failure_rate": {"max": 0}},
"sla_results": {"sla": [{"success": True}]},
"contexts": {"users": {}},
"contexts_results": [],
"data": [{"timestamp": 1, "atomic_actions": {"foo": 1.0,
"bar": 1.0},
"duration": 5, "idle_duration": 0, "error": [{}]},
{"timestamp": 2, "atomic_actions": {"bar": 1.1},
"duration": 3, "idle_duration": 0, "error": []}],
"statistics": {"durations": mock.ANY}
}
results = [{
"hooks": [{
"config": {
"name": "foo",
"args": {"arg1": "v1"},
"description": "descr",
"trigger": {"name": "t", "args": {"a2", "v2"}}}}],
"key": {
"name": workload["name"],
"description": workload["description"],
"pos": workload["position"],
"kw": {
"args": workload["args"],
"runner": {"type": "constant", "time": 3},
"hooks": [{
"name": "foo",
"args": {"arg1": "v1"},
"description": "descr",
"trigger": {"name": "t", "args": {"a2", "v2"}}}],
"sla": workload["sla"],
"context": workload["contexts"]}},
"sla": workload["sla_results"]["sla"],
"result": workload["data"],
"full_duration": workload["full_duration"],
"load_duration": workload["load_duration"],
"created_at": "2017-01-07T07:03:01"}
]
mock_loads.return_value = results
ret = self.task._load_task_results_file(self.fake_api, task_file)
self.assertEqual([{
"version": 2,
"title": "Task loaded from a file.",
"description": "Auto-ported from task format V1.",
"uuid": "n/a",
"env_uuid": "n/a",
"env_name": "n/a",
"status": "finished",
"tags": [],
"subtasks": [{
"title": "A SubTask",
"description": "",
"workloads": [workload]}]}], ret)
@mock.patch("rally.cli.commands.task.open", create=True)
@mock.patch("rally.cli.commands.task.json.loads")
@mock.patch("rally.cli.commands.task.jsonschema.validate")
def test__load_task_new_results_file(self, mock_validate,
mock_loads, mock_open):
task_file = "/tmp/task.json"
results = {
"tasks": [{
"env_uuid": "env-uuid-1",
"env_name": "env-name-1",
"subtasks": [{
"workloads": [{
"contexts": "contexts",
"scenario": {"Foo.bar": {}},
"runner": {"constant": {
"times": 100,
"concurrency": 5
}}
}]
}]
}]
}
mock_loads.return_value = results
ret = self.task._load_task_results_file(self.fake_api, task_file)
self.assertEqual([{
"env_uuid": "env-uuid-1",
"env_name": "env-name-1",
"subtasks": [{
"workloads": [{
"args": {},
"name": "Foo.bar",
"contexts": "contexts",
"contexts_results": [],
"runner_type": "constant",
"runner": {
"times": 100,
"concurrency": 5
}
}]
}]
}], ret)
@mock.patch("rally.cli.commands.task.open", create=True)
@mock.patch("rally.cli.commands.task.json.loads")
def test__load_task_results_file_wrong_format(self, mock_loads, mock_open):
task_id = "/tmp/task.json"
mock_loads.return_value = "results"
self.assertRaises(task.FailedToLoadResults,
self.task._load_task_results_file,
api=self.real_api, task_id=task_id)
mock_loads.return_value = ["results"]
self.assertRaises(task.FailedToLoadResults,
self.task._load_task_results_file,
api=self.real_api, task_id=task_id)
@mock.patch("rally.cli.task_results_loader.load")
@mock.patch("rally.cli.commands.task.os.path")
def test_import_results(self, mock_os_path):
def test_import_results(self, mock_os_path, mock_load):
mock_os_path.exists.return_value = True
mock_os_path.expanduser = lambda path: path
self.task._load_task_results_file = mock.MagicMock(
return_value=["results"]
)
mock_load.return_value = ["results"]
self.task.import_results(self.fake_api,
"deployment_uuid",
"task_file", tags=["tag"])
self.task._load_task_results_file.assert_called_once_with(
self.fake_api, "task_file"
)
mock_load.assert_called_once_with("task_file")
self.fake_api.task.import_results.assert_called_once_with(
deployment="deployment_uuid", task_results="results",
tags=["tag"])

View File

@ -0,0 +1,240 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from unittest import mock
from rally.cli import task_results_loader
from tests.unit import test
PATH = "rally.cli.task_results_loader"
class LoaderTestCase(test.TestCase):
@mock.patch("%s._update_old_results" % PATH)
@mock.patch("%s._update_new_results" % PATH)
@mock.patch("%s.open" % PATH)
def test_load(self, mock_open,
mock__update_new_results,
mock__update_old_results):
r_file = mock_open.return_value.__enter__.return_value
# case 1: the file contains invalid JSON
r_file.read.return_value = ""
self.assertRaises(task_results_loader.FailedToLoadResults,
task_results_loader.load,
"/some/path")
self.assertFalse(mock__update_new_results.called)
self.assertFalse(mock__update_old_results.called)
mock__update_new_results.reset_mock()
mock__update_old_results.reset_mock()
# case 2: the file contains valid JSON with a dict that doesn't have
# 'tasks' key
r_file.read.return_value = "{}"
self.assertRaises(task_results_loader.FailedToLoadResults,
task_results_loader.load,
"/some/path")
self.assertFalse(mock__update_new_results.called)
self.assertFalse(mock__update_old_results.called)
mock__update_new_results.reset_mock()
mock__update_old_results.reset_mock()
# case 3: the file contains valid JSON with a dict that doesn't have
# 'tasks' key
r_file.read.return_value = "{\"tasks\": \"foo\"}"
self.assertEqual(mock__update_new_results.return_value,
task_results_loader.load("/some/path"))
mock__update_new_results.assert_called_once_with({"tasks": "foo"})
self.assertFalse(mock__update_old_results.called)
mock__update_new_results.reset_mock()
mock__update_old_results.reset_mock()
# case 4: the file contains valid JSON with a list
r_file.read.return_value = "[\"foo\"]"
self.assertEqual(mock__update_old_results.return_value,
task_results_loader.load("/some/path"))
self.assertFalse(mock__update_new_results.called)
mock__update_old_results.assert_called_once_with(["foo"], "/some/path")
def test__update_new_results(self):
results = {
"tasks": [{
"env_uuid": "env-uuid-1",
"env_name": "env-name-1",
"subtasks": [{
"workloads": [{
"contexts": {"xxx": {}},
"scenario": {"Foo.bar": {}},
"runner": {
"constant": {
"times": 100,
"concurrency": 5
}
},
"sla": {},
"sla_results": {},
"position": 0,
"pass_sla": True,
"statistics": {},
"data": [],
"full_duration": 5,
"load_duration": 2,
"total_iteration_count": 3,
"failed_iteration_count": 0
}]
}]
}]
}
self.assertEqual(
[
{
"env_uuid": "env-uuid-1",
"env_name": "env-name-1",
"subtasks": [{
"workloads": [{
"args": {},
"name": "Foo.bar",
"contexts": {"xxx": {}},
"contexts_results": [],
"runner_type": "constant",
"runner": {
"times": 100,
"concurrency": 5
},
"sla": {},
"sla_results": {},
"position": 0,
"pass_sla": True,
"statistics": {},
"data": [],
"full_duration": 5,
"load_duration": 2,
"total_iteration_count": 3,
"failed_iteration_count": 0
}]
}]
}
],
task_results_loader._update_new_results(results)
)
def test__update_old_results(self):
workload = {
"uuid": "n/a",
"full_duration": 2, "load_duration": 1,
"created_at": "2017-07-01T07:03:01",
"updated_at": "2017-07-01T07:03:03",
"total_iteration_count": 2,
"failed_iteration_count": 1,
"min_duration": 3,
"max_duration": 5,
"start_time": 1,
"name": "Foo.bar", "description": "descr",
"position": 2,
"args": {"key1": "value1"},
"runner_type": "constant",
"runner": {"time": 3},
"hooks": [{"config": {
"description": "descr",
"action": ("foo", {"arg1": "v1"}),
"trigger": ("t", {"a2", "v2"})}}],
"pass_sla": True,
"sla": {"failure_rate": {"max": 0}},
"sla_results": {"sla": [{"success": True}]},
"contexts": {"users": {}},
"contexts_results": [],
"data": [{"timestamp": 1, "atomic_actions": {"foo": 1.0,
"bar": 1.0},
"duration": 5, "idle_duration": 0, "error": [{}]},
{"timestamp": 2, "atomic_actions": {"bar": 1.1},
"duration": 3, "idle_duration": 0, "error": []}],
"statistics": {"durations": mock.ANY}
}
results = [{
"hooks": [{
"config": {
"name": "foo",
"args": {"arg1": "v1"},
"description": "descr",
"trigger": {"name": "t", "args": {"a2", "v2"}}}}],
"key": {
"name": workload["name"],
"description": workload["description"],
"pos": workload["position"],
"kw": {
"args": workload["args"],
"runner": {"type": "constant", "time": 3},
"hooks": [{
"name": "foo",
"args": {"arg1": "v1"},
"description": "descr",
"trigger": {"name": "t", "args": {"a2", "v2"}}}],
"sla": workload["sla"],
"context": workload["contexts"]}},
"sla": workload["sla_results"]["sla"],
"result": workload["data"],
"full_duration": workload["full_duration"],
"load_duration": workload["load_duration"],
"created_at": "2017-01-07T07:03:01"}
]
self.assertEqual(
[
{
"version": 2,
"title": "Task loaded from a file.",
"description": "Auto-ported from task format V1.",
"uuid": "n/a",
"env_uuid": "n/a",
"env_name": "n/a",
"status": "finished",
"tags": [],
"subtasks": [{
"title": "A SubTask",
"description": "",
"workloads": [workload]}]
}
],
task_results_loader._update_old_results(results, "xxx")
)
def test__update_atomic_actions(self):
atomic_actions = collections.OrderedDict(
[("action_1", 1), ("action_2", 2)])
self.assertEqual(
[
{
"name": "action_1",
"started_at": 1,
"finished_at": 2,
"children": []
},
{
"name": "action_2",
"started_at": 2,
"finished_at": 4,
"children": []
}
],
task_results_loader._update_atomic_actions(atomic_actions, 1)
)

View File

@ -13,12 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import datetime as dt
from unittest import mock
import uuid
import ddt
from jsonschema import exceptions as schema_exceptions
from rally import exceptions
@ -549,80 +547,3 @@ class WaitForStatusTestCase(test.TestCase):
utils.wait_for_status,
resource=res, ready_statuses=["ready"],
update_resource=upd, timeout=2, id_attr="uuid")
@ddt.ddt
class WrapperForAtomicActionsTestCase(test.TestCase):
def test_dict_atomic(self):
atomic_actions = collections.OrderedDict(
[("action_1", 1), ("action_2", 2)])
atomic_wrapper = utils.WrapperForAtomicActions(atomic_actions, 1)
self.assertEqual(1, atomic_wrapper["action_1"])
self.assertEqual(2, atomic_wrapper["action_2"])
self.assertEqual(atomic_actions.items(),
atomic_wrapper.items())
self.assertEqual(1, atomic_wrapper.get("action_1"))
self.assertIsNone(atomic_wrapper.get("action_3"))
self.assertEqual(2, len(atomic_wrapper))
self.assertEqual([{"name": "action_1", "started_at": 1,
"finished_at": 2, "children": []},
{"name": "action_2", "started_at": 2,
"finished_at": 4, "children": []}
], atomic_wrapper)
def test_list_atomic(self):
atomic_actions = [{"name": "action_1", "started_at": 1,
"finished_at": 2, "children": []},
{"name": "action_2", "started_at": 2,
"finished_at": 4, "children": []}]
atomic_wrapper = utils.WrapperForAtomicActions(atomic_actions)
self.assertEqual(1, atomic_wrapper["action_1"])
self.assertEqual(2, atomic_wrapper["action_2"])
self.assertEqual(
collections.OrderedDict(
[("action_1", 1), ("action_2", 2)]).items(),
atomic_wrapper.items())
self.assertEqual(atomic_actions[0], atomic_wrapper[0])
self.assertEqual(atomic_actions[1], atomic_wrapper[1])
self.assertEqual(1, atomic_wrapper.get("action_1"))
self.assertIsNone(atomic_wrapper.get("action_3"))
self.assertEqual(2, len(atomic_wrapper))
self.assertEqual(atomic_actions[0], next(iter(atomic_wrapper)))
def test__convert_new_atomic_actions(self):
atomic_actions = collections.OrderedDict(
[("action_1", 1), ("action_2", 2)])
atomic_wrapper = utils.WrapperForAtomicActions(atomic_actions)
self.assertEqual(
[{"name": "action_1", "started_at": 0,
"finished_at": 1, "children": []},
{"name": "action_2", "started_at": 1,
"finished_at": 3, "children": []}],
atomic_wrapper._convert_old_atomic_actions(atomic_actions))
@ddt.data(
{"atomic_actions": [{"name": "some", "started_at": 1.0,
"finished_at": 2.0, "children": []}],
"expected": {"some": 1.0}},
{"atomic_actions": [{"name": "some", "started_at": 1.0,
"finished_at": 2.0, "children": []},
{"name": "some", "started_at": 2.0,
"finished_at": 3.0, "children": []}],
"expected": {"some": 1.0, "some (2)": 1.0}},
{"atomic_actions": [{"name": "some", "started_at": 1.0,
"finished_at": 2.0, "children": []},
{"name": "some", "started_at": 2.0,
"finished_at": 3.0, "children": []},
{"name": "some", "started_at": 3.0,
"finished_at": 4.0, "children": []}
],
"expected": {"some": 1.0, "some (2)": 1.0, "some (3)": 1.0}}
)
@ddt.unpack
def test_convert_new_atomic_actions(self, atomic_actions, expected):
atomic_wrapper = utils.WrapperForAtomicActions(atomic_actions)
self.assertEqual(expected,
atomic_wrapper._convert_new_atomic_actions(
atomic_actions))