Merge "New plugin type - Hook"
This commit is contained in:
commit
faa00bea07
@ -96,6 +96,52 @@ OUTPUT_SCHEMA = {
|
||||
"additionalProperties": False
|
||||
}
|
||||
|
||||
HOOK_RESULT_SCHEMA = {
|
||||
"type": "object",
|
||||
"$schema": consts.JSON_SCHEMA,
|
||||
"properties": {
|
||||
"hook": {"type": "string"},
|
||||
"started_at": {"type": "number"},
|
||||
"finished_at": {"type": "number"},
|
||||
"triggered_by": {
|
||||
"type": "object",
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"iteration": {"type": "integer"},
|
||||
},
|
||||
"required": ["iteration"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"time": {"type": "integer"},
|
||||
},
|
||||
"required": ["time"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
]
|
||||
},
|
||||
"description": {"type": "string"},
|
||||
"status": {"type": "string"},
|
||||
"error": {
|
||||
"type": "array",
|
||||
"minItems": 3,
|
||||
"maxItems": 3,
|
||||
"items": {"type": "string"},
|
||||
},
|
||||
"output": OUTPUT_SCHEMA,
|
||||
},
|
||||
"required": [
|
||||
"hook",
|
||||
"started_at",
|
||||
"finished_at",
|
||||
"triggered_by",
|
||||
"description",
|
||||
"status",
|
||||
],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
TASK_RESULT_SCHEMA = {
|
||||
"type": "object",
|
||||
@ -133,6 +179,10 @@ TASK_RESULT_SCHEMA = {
|
||||
}
|
||||
}
|
||||
},
|
||||
"hooks": {
|
||||
"type": "array",
|
||||
"items": HOOK_RESULT_SCHEMA,
|
||||
},
|
||||
"result": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@ -220,6 +270,10 @@ TASK_EXTENDED_RESULT_SCHEMA = {
|
||||
}
|
||||
}
|
||||
},
|
||||
"hooks": {
|
||||
"type": "array",
|
||||
"items": HOOK_RESULT_SCHEMA,
|
||||
},
|
||||
"iterations": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
|
@ -181,6 +181,14 @@ class _ServiceType(utils.ImmutableMixin, utils.EnumMixin):
|
||||
return self.__names[service_type]
|
||||
|
||||
|
||||
class _HookStatus(utils.ImmutableMixin, utils.EnumMixin):
|
||||
"""Hook result statuses."""
|
||||
UNKNOWN = "n/a"
|
||||
SUCCESS = "success"
|
||||
FAILED = "failed"
|
||||
VALIDATION_FAILED = "validation_failed"
|
||||
|
||||
|
||||
TaskStatus = _TaskStatus()
|
||||
DeployStatus = _DeployStatus()
|
||||
EndpointPermission = _EndpointPermission()
|
||||
@ -189,3 +197,4 @@ Service = _Service()
|
||||
EndpointType = _EndpointType()
|
||||
TempestTestsAPI = _TempestTestsAPI()
|
||||
TempestTestsSets = _TempestTestsSets()
|
||||
HookStatus = _HookStatus()
|
||||
|
225
rally/task/hook.py
Normal file
225
rally/task/hook.py
Normal file
@ -0,0 +1,225 @@
|
||||
# Copyright 2016: Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import collections
|
||||
import threading
|
||||
|
||||
import jsonschema
|
||||
import six
|
||||
|
||||
from rally.common.i18n import _, _LE
|
||||
from rally.common import logging
|
||||
from rally.common import objects
|
||||
from rally.common.plugin import plugin
|
||||
from rally.common import utils as rutils
|
||||
from rally import consts
|
||||
from rally.task import trigger
|
||||
from rally.task import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
configure = plugin.configure
|
||||
|
||||
|
||||
class HookExecutor(object):
|
||||
"""Runs hooks and collects results from them."""
|
||||
|
||||
def __init__(self, config, task):
|
||||
self.config = config
|
||||
self.task = task
|
||||
self._timer_thread = threading.Thread(target=self._timer_method)
|
||||
self._timer_stop_event = threading.Event()
|
||||
|
||||
# map trigers to event types
|
||||
self.triggers = collections.defaultdict(list)
|
||||
for hook in config.get("hooks", []):
|
||||
trigger_obj = trigger.Trigger.get(
|
||||
hook["trigger"]["name"])(hook["trigger"]["args"])
|
||||
trigger_event_type = trigger_obj.get_configured_event_type()
|
||||
self.triggers[trigger_event_type].append(
|
||||
(trigger_obj, hook["name"], hook["args"], hook["description"])
|
||||
)
|
||||
|
||||
# list of executed hooks
|
||||
self.hooks = []
|
||||
|
||||
def _timer_method(self):
|
||||
"""Timer thread method.
|
||||
|
||||
It generates events with type "time" to inform HookExecutor
|
||||
about how many time passed since beginning of the first iteration.
|
||||
"""
|
||||
stopwatch = rutils.Stopwatch(stop_event=self._timer_stop_event)
|
||||
stopwatch.start()
|
||||
seconds_since_start = 0
|
||||
while not self._timer_stop_event.isSet():
|
||||
self.on_event(event_type="time", value=seconds_since_start)
|
||||
seconds_since_start += 1
|
||||
stopwatch.sleep(seconds_since_start)
|
||||
|
||||
def _start_timer(self):
|
||||
self._timer_thread.start()
|
||||
|
||||
def _stop_timer(self):
|
||||
self._timer_stop_event.set()
|
||||
if self._timer_thread.ident is not None:
|
||||
self._timer_thread.join()
|
||||
|
||||
def on_event(self, event_type, value):
|
||||
"""Notify about event.
|
||||
|
||||
This method should be called to inform HookExecutor that
|
||||
particular event occured.
|
||||
It runs hooks configured for event.
|
||||
"""
|
||||
# start timer on first iteration
|
||||
if self.triggers["time"]:
|
||||
if event_type == "iteration" and value == 1:
|
||||
self._start_timer()
|
||||
|
||||
triggers = self.triggers[event_type]
|
||||
for trigger_obj, hook_name, hook_args, hook_description in triggers:
|
||||
if trigger_obj.is_runnable(value=value):
|
||||
hook = Hook.get(hook_name)(
|
||||
config=hook_args, triggered_by={event_type: value},
|
||||
description=hook_description)
|
||||
self.hooks.append(hook)
|
||||
hook.run_async()
|
||||
LOG.info(_("Hook %s is trigged for Task %s by %s=%s")
|
||||
% (hook_name, self.task["uuid"], event_type, value))
|
||||
|
||||
def results(self):
|
||||
"""Returns list of dicts with hook results."""
|
||||
self._stop_timer()
|
||||
return [hook.result() for hook in self.hooks]
|
||||
|
||||
|
||||
@plugin.base()
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Hook(plugin.Plugin):
|
||||
"""Factory for hook classes."""
|
||||
|
||||
@classmethod
|
||||
def validate(cls, config):
|
||||
hook_schema = cls.get(config["name"]).CONFIG_SCHEMA
|
||||
jsonschema.validate(config["args"], hook_schema)
|
||||
|
||||
trigger.Trigger.validate(config["trigger"])
|
||||
|
||||
def __init__(self, config, triggered_by, description):
|
||||
self.config = config
|
||||
self._triggered_by = triggered_by
|
||||
self._description = description
|
||||
self._thread = threading.Thread(target=self._thread_method)
|
||||
self._started_at = 0.0
|
||||
self._finished_at = 0.0
|
||||
self._result = self._format_result(status=consts.HookStatus.UNKNOWN)
|
||||
|
||||
def _format_result(self, status, error=None):
|
||||
"""Returns hook result dict."""
|
||||
result = {
|
||||
"hook": self.get_name(),
|
||||
"status": status,
|
||||
"description": self._description,
|
||||
"started_at": self._started_at,
|
||||
"finished_at": self._finished_at,
|
||||
"triggered_by": self._triggered_by,
|
||||
}
|
||||
if error is not None:
|
||||
result["error"] = error
|
||||
return result
|
||||
|
||||
def set_error(self, exception_name, description, details):
|
||||
"""Set error related information to result.
|
||||
|
||||
:param exception_name: name of exception as string
|
||||
:param description: short description as string
|
||||
:param details: any details as string
|
||||
"""
|
||||
self._result["error"] = [exception_name, description, details]
|
||||
|
||||
def set_status(self, status):
|
||||
"""Set status to result."""
|
||||
self._result["status"] = status
|
||||
|
||||
def set_output(self, output):
|
||||
"""Set output to result.
|
||||
|
||||
:param output: Diagram data in task.OUTPUT_SCHEMA format
|
||||
"""
|
||||
self._result["output"] = output
|
||||
|
||||
def _thread_method(self):
|
||||
# Run hook synchronously
|
||||
self.run_sync()
|
||||
|
||||
try:
|
||||
self.validate_result_schema()
|
||||
except jsonschema.ValidationError as validation_error:
|
||||
LOG.error(_LE("Hook %s returned result "
|
||||
"in wrong format.") % self.get_name())
|
||||
LOG.exception(validation_error)
|
||||
|
||||
self._result = self._format_result(
|
||||
status=consts.HookStatus.VALIDATION_FAILED,
|
||||
error=utils.format_exc(validation_error),
|
||||
)
|
||||
|
||||
def validate_result_schema(self):
|
||||
"""Validates result format."""
|
||||
jsonschema.validate(self._result, objects.task.HOOK_RESULT_SCHEMA)
|
||||
|
||||
def run_async(self):
|
||||
"""Run hook asynchronously."""
|
||||
self._thread.start()
|
||||
|
||||
def run_sync(self):
|
||||
"""Run hook synchronously."""
|
||||
try:
|
||||
with rutils.Timer() as timer:
|
||||
self.run()
|
||||
except Exception as exc:
|
||||
LOG.error(_LE("Hook %s failed during run.") % self.get_name())
|
||||
LOG.exception(exc)
|
||||
self.set_status(consts.HookStatus.FAILED)
|
||||
self.set_error(*utils.format_exc(exc))
|
||||
|
||||
self._started_at = timer.timestamp()
|
||||
self._result["started_at"] = self._started_at
|
||||
self._finished_at = timer.finish_timestamp()
|
||||
self._result["finished_at"] = self._finished_at
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self):
|
||||
"""Run method.
|
||||
|
||||
This method should be implemented in plugin.
|
||||
|
||||
Hook plugin shoud call following methods to set result:
|
||||
set_result_status - to set success/failed status
|
||||
Optionally the following methods should be colled:
|
||||
set_result_error - to indicate that there was an error
|
||||
set_result_output - to provide diarmam data
|
||||
"""
|
||||
|
||||
def result(self):
|
||||
"""Wait and return result of hook."""
|
||||
if self._thread.ident is not None:
|
||||
# hook is stil running, wait for result
|
||||
self._thread.join()
|
||||
return self._result
|
@ -1713,6 +1713,9 @@ class FakeTimer(rally_utils.Timer):
|
||||
def timestamp(self):
|
||||
return 0
|
||||
|
||||
def finish_timestamp(self):
|
||||
return 3
|
||||
|
||||
|
||||
@context.configure(name="fake", order=1)
|
||||
class FakeContext(context.Context):
|
||||
|
258
tests/unit/task/test_hook.py
Normal file
258
tests/unit/task/test_hook.py
Normal file
@ -0,0 +1,258 @@
|
||||
# Copyright 2016: Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Tests for HookExecutor and Hook classes."""
|
||||
|
||||
import jsonschema
|
||||
import mock
|
||||
|
||||
from rally import consts
|
||||
from rally.task import hook
|
||||
from tests.unit import fakes
|
||||
from tests.unit import test
|
||||
|
||||
|
||||
@hook.configure(name="dummy_hook")
|
||||
class DummyHook(hook.Hook):
|
||||
CONFIG_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": {"type": "string"},
|
||||
"error": {"type": "array"},
|
||||
"output": {"type": "object"},
|
||||
},
|
||||
"required": ["status"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
def run(self):
|
||||
self.set_status(self.config["status"])
|
||||
|
||||
error = self.config.get("error")
|
||||
if error:
|
||||
self.set_error(*error)
|
||||
|
||||
output = self.config.get("output")
|
||||
if output:
|
||||
self.set_output(output)
|
||||
|
||||
|
||||
class HookExecutorTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(HookExecutorTestCase, self).setUp()
|
||||
self.conf = {
|
||||
"hooks": [
|
||||
{
|
||||
"name": "dummy_hook",
|
||||
"description": "dummy_action",
|
||||
"args": {
|
||||
"status": consts.HookStatus.SUCCESS,
|
||||
},
|
||||
"trigger": {
|
||||
"name": "event",
|
||||
"args": {
|
||||
"unit": "iteration",
|
||||
"at": [1],
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
self.task = mock.MagicMock()
|
||||
|
||||
@mock.patch("rally.task.hook.HookExecutor._timer_method")
|
||||
@mock.patch("rally.common.utils.Timer", side_effect=fakes.FakeTimer)
|
||||
def test_results(self, mock_timer, mock__timer_method):
|
||||
hook_executor = hook.HookExecutor(self.conf, self.task)
|
||||
hook_executor.on_event(event_type="iteration", value=1)
|
||||
|
||||
self.assertEqual(
|
||||
[{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"triggered_by": {"iteration": 1},
|
||||
"started_at": fakes.FakeTimer().timestamp(),
|
||||
"finished_at": fakes.FakeTimer().finish_timestamp(),
|
||||
"status": consts.HookStatus.SUCCESS}], hook_executor.results())
|
||||
|
||||
@mock.patch("rally.task.hook.HookExecutor._timer_method")
|
||||
@mock.patch("rally.common.utils.Timer", side_effect=fakes.FakeTimer)
|
||||
def test_result_optional(self, mock_timer, mock__timer_method):
|
||||
hook_args = self.conf["hooks"][0]["args"]
|
||||
hook_args["error"] = ["Exception", "Description", "Traceback"]
|
||||
hook_args["output"] = {"additive": [], "complete": []}
|
||||
|
||||
hook_executor = hook.HookExecutor(self.conf, self.task)
|
||||
hook_executor.on_event(event_type="iteration", value=1)
|
||||
|
||||
self.assertEqual(
|
||||
[{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"triggered_by": {"iteration": 1},
|
||||
"started_at": fakes.FakeTimer().timestamp(),
|
||||
"finished_at": fakes.FakeTimer().finish_timestamp(),
|
||||
"error": ["Exception", "Description", "Traceback"],
|
||||
"output": {"additive": [], "complete": []},
|
||||
"status": consts.HookStatus.SUCCESS}], hook_executor.results())
|
||||
|
||||
def test_empty_result(self):
|
||||
hook_executor = hook.HookExecutor(self.conf, self.task)
|
||||
self.assertEqual([], hook_executor.results())
|
||||
|
||||
@mock.patch("rally.task.hook.HookExecutor._timer_method")
|
||||
@mock.patch.object(DummyHook, "run", side_effect=Exception("My err msg"))
|
||||
@mock.patch("rally.common.utils.Timer", side_effect=fakes.FakeTimer)
|
||||
def test_failed_result(self, mock_timer, mock_dummy_hook_run,
|
||||
mock__timer_method):
|
||||
hook_executor = hook.HookExecutor(self.conf, self.task)
|
||||
hook_executor.on_event(event_type="iteration", value=1)
|
||||
|
||||
self.assertEqual(
|
||||
[{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"triggered_by": {"iteration": 1},
|
||||
"error": ["Exception", "My err msg", mock.ANY],
|
||||
"started_at": fakes.FakeTimer().timestamp(),
|
||||
"finished_at": fakes.FakeTimer().finish_timestamp(),
|
||||
"status": consts.HookStatus.FAILED}], hook_executor.results())
|
||||
|
||||
@mock.patch("rally.task.hook.HookExecutor._timer_method")
|
||||
@mock.patch("rally.common.utils.Timer", side_effect=fakes.FakeTimer)
|
||||
def test_result_wrong_format(self, mock_timer, mock__timer_method):
|
||||
hook_args = self.conf["hooks"][0]["args"]
|
||||
hook_args["status"] = 10
|
||||
hook_executor = hook.HookExecutor(self.conf, self.task)
|
||||
hook_executor.on_event(event_type="iteration", value=1)
|
||||
|
||||
self.assertEqual(
|
||||
[{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"triggered_by": {"iteration": 1},
|
||||
"error": ["ValidationError", mock.ANY, mock.ANY],
|
||||
"started_at": fakes.FakeTimer().timestamp(),
|
||||
"finished_at": fakes.FakeTimer().finish_timestamp(),
|
||||
"status": consts.HookStatus.VALIDATION_FAILED}],
|
||||
hook_executor.results())
|
||||
|
||||
@mock.patch("rally.common.utils.Timer", side_effect=fakes.FakeTimer)
|
||||
def test_time_event(self, mock_timer):
|
||||
trigger_args = self.conf["hooks"][0]["trigger"]["args"]
|
||||
trigger_args["unit"] = "time"
|
||||
|
||||
hook_executor = hook.HookExecutor(self.conf, self.task)
|
||||
hook_executor.on_event(event_type="time", value=1)
|
||||
|
||||
self.assertEqual(
|
||||
[{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"triggered_by": {"time": 1},
|
||||
"started_at": fakes.FakeTimer().timestamp(),
|
||||
"finished_at": fakes.FakeTimer().finish_timestamp(),
|
||||
"status": consts.HookStatus.SUCCESS}], hook_executor.results())
|
||||
|
||||
@mock.patch("rally.common.utils.Stopwatch", autospec=True)
|
||||
@mock.patch("rally.common.utils.Timer", side_effect=fakes.FakeTimer)
|
||||
def test_timer_thread(self, mock_timer, mock_stopwatch):
|
||||
trigger_args = self.conf["hooks"][0]["trigger"]["args"]
|
||||
trigger_args["unit"] = "time"
|
||||
hook_executor = hook.HookExecutor(self.conf, self.task)
|
||||
|
||||
def stop_timer(sec):
|
||||
if sec == 3:
|
||||
hook_executor._timer_stop_event.set()
|
||||
|
||||
stopwatch_inst = mock_stopwatch.return_value
|
||||
stopwatch_inst.sleep.side_effect = stop_timer
|
||||
|
||||
hook_executor.on_event(event_type="iteration", value=1)
|
||||
self.assertTrue(hook_executor._timer_stop_event.wait(1))
|
||||
|
||||
self.assertEqual(
|
||||
[{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"triggered_by": {"time": 1},
|
||||
"started_at": fakes.FakeTimer().timestamp(),
|
||||
"finished_at": fakes.FakeTimer().finish_timestamp(),
|
||||
"status": consts.HookStatus.SUCCESS}], hook_executor.results())
|
||||
|
||||
stopwatch_inst.start.assert_called_once_with()
|
||||
stopwatch_inst.sleep.assert_has_calls([
|
||||
mock.call(1),
|
||||
mock.call(2),
|
||||
mock.call(3),
|
||||
])
|
||||
|
||||
|
||||
class HookTestCase(test.TestCase):
|
||||
|
||||
def test_validate(self):
|
||||
hook.Hook.validate(
|
||||
{
|
||||
"name": "dummy_hook",
|
||||
"description": "dummy_action",
|
||||
"args": {
|
||||
"status": consts.HookStatus.SUCCESS,
|
||||
},
|
||||
"trigger": {
|
||||
"name": "event",
|
||||
"args": {
|
||||
"unit": "iteration",
|
||||
"at": [1],
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
def test_validate_error(self):
|
||||
conf = {
|
||||
"name": "dummy_hook",
|
||||
"description": "dummy_action",
|
||||
"args": 3,
|
||||
"trigger": {
|
||||
"name": "event",
|
||||
"args": {
|
||||
"unit": "iteration",
|
||||
"at": [1],
|
||||
}
|
||||
}
|
||||
}
|
||||
self.assertRaises(jsonschema.ValidationError, hook.Hook.validate, conf)
|
||||
|
||||
@mock.patch("rally.common.utils.Timer", side_effect=fakes.FakeTimer)
|
||||
def test_result(self, mock_timer):
|
||||
dummy_hook = DummyHook({"status": consts.HookStatus.SUCCESS},
|
||||
{"iteration": 1}, "dummy_action")
|
||||
dummy_hook.run_sync()
|
||||
dummy_hook.validate_result_schema()
|
||||
|
||||
self.assertEqual(
|
||||
{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"started_at": fakes.FakeTimer().timestamp(),
|
||||
"finished_at": fakes.FakeTimer().finish_timestamp(),
|
||||
"triggered_by": {"iteration": 1},
|
||||
"status": consts.HookStatus.SUCCESS}, dummy_hook.result())
|
||||
|
||||
def test_result_not_started(self):
|
||||
dummy_hook = DummyHook({"status": consts.HookStatus.SUCCESS},
|
||||
{"iteration": 1}, "dummy_action")
|
||||
|
||||
self.assertEqual(
|
||||
{"description": "dummy_action",
|
||||
"hook": "dummy_hook",
|
||||
"started_at": 0.0,
|
||||
"finished_at": 0.0,
|
||||
"triggered_by": {"iteration": 1},
|
||||
"status": consts.HookStatus.UNKNOWN}, dummy_hook.result())
|
Loading…
x
Reference in New Issue
Block a user