diff --git a/doc/source/architecture.rst b/doc/source/architecture.rst index 26e7c7a86..cb13246de 100644 --- a/doc/source/architecture.rst +++ b/doc/source/architecture.rst @@ -261,6 +261,13 @@ previously created :ref:`Audit template `: .. image:: ./images/sequence_create_and_launch_audit.png :width: 100% +The :ref:`Administrator ` also can specify type of +Audit and interval (in case of CONTINUOUS type). There is two types of Audit: +ONESHOT and CONTINUOUS. Oneshot Audit is launched once and if it succeeded +executed new action plan list will be provided. Continuous Audit creates +action plans with specified interval (in seconds); if action plan +has been created, all previous action plans get CANCELLED state. + A message is sent on the :ref:`AMQP bus ` which triggers the Audit in the :ref:`Watcher Decision Engine `: diff --git a/doc/source/image_src/plantuml/watcher_db_schema_diagram.txt b/doc/source/image_src/plantuml/watcher_db_schema_diagram.txt index ab02d1c97..823e442e8 100644 --- a/doc/source/image_src/plantuml/watcher_db_schema_diagram.txt +++ b/doc/source/image_src/plantuml/watcher_db_schema_diagram.txt @@ -10,7 +10,7 @@ table(goal) { uuid : String[36] name : String[63] display_name : String[63] - + created_at : DateTime updated_at : DateTime deleted_at : DateTime @@ -24,7 +24,7 @@ table(strategy) { uuid : String[36] name : String[63] display_name : String[63] - + created_at : DateTime updated_at : DateTime deleted_at : DateTime @@ -42,7 +42,7 @@ table(audit_template) { host_aggregate : Integer, nullable extra : JSONEncodedDict version : String[15], nullable - + created_at : DateTime updated_at : DateTime deleted_at : DateTime @@ -54,10 +54,11 @@ table(audit) { primary_key(id: Integer) foreign_key("audit_template_id : Integer") uuid : String[36] - type : String[20] + audit_type : String[20] state : String[20], nullable deadline :DateTime, nullable - + interval : Integer, nullable + created_at : DateTime updated_at : DateTime deleted_at : DateTime @@ -72,7 +73,7 @@ table(action_plan) { first_action_id : Integer state : String[20], nullable global_efficacy : JSONEncodedDict, nullable - + created_at : DateTime updated_at : DateTime deleted_at : DateTime @@ -88,7 +89,7 @@ table(action) { input_parameters : JSONEncodedDict, nullable state : String[20], nullable next : String[36], nullable - + created_at : DateTime updated_at : DateTime deleted_at : DateTime diff --git a/doc/source/images/watcher_db_schema_diagram.png b/doc/source/images/watcher_db_schema_diagram.png index a43c78faa..f85758dac 100644 Binary files a/doc/source/images/watcher_db_schema_diagram.png and b/doc/source/images/watcher_db_schema_diagram.png differ diff --git a/requirements.txt b/requirements.txt index a71b964dc..74cb8d3d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +apscheduler # MIT License enum34;python_version=='2.7' or python_version=='2.6' or python_version=='3.3' # BSD jsonpatch>=1.1 # BSD keystoneauth1>=2.7.0 # Apache-2.0 diff --git a/watcher/api/controllers/v1/audit.py b/watcher/api/controllers/v1/audit.py index 02c16fac6..689d82626 100644 --- a/watcher/api/controllers/v1/audit.py +++ b/watcher/api/controllers/v1/audit.py @@ -63,25 +63,45 @@ class AuditPostType(wtypes.Base): parameters = wtypes.wsattr({wtypes.text: types.jsontype}, mandatory=False, default={}) + interval = wsme.wsattr(int, mandatory=False) def as_audit(self): audit_type_values = [val.value for val in objects.audit.AuditType] if self.audit_type not in audit_type_values: raise exception.AuditTypeNotFound(audit_type=self.audit_type) + if (self.audit_type == objects.audit.AuditType.ONESHOT.value and + self.interval != wtypes.Unset): + raise exception.AuditIntervalNotAllowed(audit_type=self.audit_type) + + if (self.audit_type == objects.audit.AuditType.CONTINUOUS.value and + self.interval == wtypes.Unset): + raise exception.AuditIntervalNotSpecified( + audit_type=self.audit_type) + return Audit( audit_template_id=self.audit_template_uuid, audit_type=self.audit_type, deadline=self.deadline, parameters=self.parameters, - ) + interval=self.interval) class AuditPatchType(types.JsonPatchType): @staticmethod def mandatory_attrs(): - return ['/audit_template_uuid'] + return ['/audit_template_uuid', '/type'] + + @staticmethod + def validate(patch): + serialized_patch = {'path': patch.path, 'op': patch.op} + if patch.path in AuditPatchType.mandatory_attrs(): + msg = _("%(field)s can't be updated.") + raise exception.PatchError( + patch=serialized_patch, + reason=msg % dict(field=patch.path)) + return types.JsonPatchType.validate(patch) class Audit(base.APIBase): @@ -160,6 +180,9 @@ class Audit(base.APIBase): links = wsme.wsattr([link.Link], readonly=True) """A list containing a self link and associated audit links""" + interval = wsme.wsattr(int, mandatory=False) + """Launch audit periodically (in seconds)""" + def __init__(self, **kwargs): self.fields = [] fields = list(objects.Audit.fields) @@ -187,7 +210,7 @@ class Audit(base.APIBase): if not expand: audit.unset_fields_except(['uuid', 'audit_type', 'deadline', 'state', 'audit_template_uuid', - 'audit_template_name']) + 'audit_template_name', 'interval']) # The numeric ID should not be exposed to # the user, it's internal only. @@ -215,7 +238,8 @@ class Audit(base.APIBase): deadline=None, created_at=datetime.datetime.utcnow(), deleted_at=None, - updated_at=datetime.datetime.utcnow()) + updated_at=datetime.datetime.utcnow(), + interval=7200) sample._audit_template_uuid = '7ae81bb3-dec3-4289-8d6c-da80bd8001ae' return cls._convert_with_links(sample, 'http://localhost:9322', expand) @@ -414,8 +438,9 @@ class AuditsController(rest.RestController): # trigger decision-engine to run the audit - dc_client = rpcapi.DecisionEngineAPI() - dc_client.trigger_audit(context, new_audit.uuid) + if new_audit.audit_type == objects.audit.AuditType.ONESHOT.value: + dc_client = rpcapi.DecisionEngineAPI() + dc_client.trigger_audit(context, new_audit.uuid) return Audit.convert_with_links(new_audit) diff --git a/watcher/common/exception.py b/watcher/common/exception.py index 95c1f0a0a..5e3e23cd3 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -221,6 +221,14 @@ class AuditAlreadyExists(Conflict): msg_fmt = _("An audit with UUID %(uuid)s already exists") +class AuditIntervalNotSpecified(Invalid): + msg_fmt = _("Interval of audit must be specified for %(audit_type)s.") + + +class AuditIntervalNotAllowed(Invalid): + msg_fmt = _("Interval of audit must not be set for %(audit_type)s.") + + class AuditReferenced(Invalid): msg_fmt = _("Audit %(audit)s is referenced by one or multiple action " "plans") diff --git a/watcher/db/sqlalchemy/models.py b/watcher/db/sqlalchemy/models.py index 9b265ced4..f6ca6f515 100644 --- a/watcher/db/sqlalchemy/models.py +++ b/watcher/db/sqlalchemy/models.py @@ -177,6 +177,7 @@ class Audit(Base): audit_template_id = Column(Integer, ForeignKey('audit_templates.id'), nullable=False) parameters = Column(JSONEncodedDict, nullable=True) + interval = Column(Integer, nullable=True) class Action(Base): diff --git a/watcher/decision_engine/audit/base.py b/watcher/decision_engine/audit/base.py index 743ee1ffd..e44d6fded 100644 --- a/watcher/decision_engine/audit/base.py +++ b/watcher/decision_engine/audit/base.py @@ -2,6 +2,7 @@ # Copyright (c) 2015 b<>com # # Authors: Jean-Emile DARTOIS +# Alexander Chadin # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,9 +20,92 @@ import abc import six +from oslo_log import log + +from watcher.common.messaging.events import event as watcher_event +from watcher.decision_engine.messaging import events as de_events +from watcher.decision_engine.planner import manager as planner_manager +from watcher.decision_engine.strategy.context import default as default_context +from watcher.objects import audit as audit_objects + +LOG = log.getLogger(__name__) + @six.add_metaclass(abc.ABCMeta) class BaseAuditHandler(object): @abc.abstractmethod def execute(self, audit_uuid, request_context): raise NotImplementedError() + + @abc.abstractmethod + def pre_execute(self, audit_uuid, request_context): + raise NotImplementedError() + + @abc.abstractmethod + def do_execute(self, audit, request_context): + raise NotImplementedError() + + @abc.abstractmethod + def post_execute(self, audit, solution, request_context): + raise NotImplementedError() + + +@six.add_metaclass(abc.ABCMeta) +class AuditHandler(BaseAuditHandler): + def __init__(self, messaging): + self._messaging = messaging + self._strategy_context = default_context.DefaultStrategyContext() + self._planner_manager = planner_manager.PlannerManager() + self._planner = None + + @property + def planner(self): + if self._planner is None: + self._planner = self._planner_manager.load() + return self._planner + + @property + def messaging(self): + return self._messaging + + @property + def strategy_context(self): + return self._strategy_context + + def notify(self, audit_uuid, event_type, status): + event = watcher_event.Event() + event.type = event_type + event.data = {} + payload = {'audit_uuid': audit_uuid, + 'audit_status': status} + self.messaging.status_topic_handler.publish_event( + event.type.name, payload) + + def update_audit_state(self, request_context, audit, state): + LOG.debug("Update audit state: %s", state) + audit.state = state + audit.save() + self.notify(audit.uuid, de_events.Events.TRIGGER_AUDIT, state) + + def pre_execute(self, audit, request_context): + LOG.debug("Trigger audit %s", audit.uuid) + # change state of the audit to ONGOING + self.update_audit_state(request_context, audit, + audit_objects.State.ONGOING) + + def post_execute(self, audit, solution, request_context): + self.planner.schedule(request_context, audit.id, solution) + + # change state of the audit to SUCCEEDED + self.update_audit_state(request_context, audit, + audit_objects.State.SUCCEEDED) + + def execute(self, audit, request_context): + try: + self.pre_execute(audit, request_context) + solution = self.do_execute(audit, request_context) + self.post_execute(audit, solution, request_context) + except Exception as e: + LOG.exception(e) + self.update_audit_state(request_context, audit, + audit_objects.State.FAILED) diff --git a/watcher/decision_engine/audit/continuous.py b/watcher/decision_engine/audit/continuous.py new file mode 100644 index 000000000..3aaf09e8a --- /dev/null +++ b/watcher/decision_engine/audit/continuous.py @@ -0,0 +1,126 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Servionica LTD +# +# Authors: Alexander Chadin +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime + +from apscheduler.schedulers import background + +from oslo_config import cfg + +from watcher.common import context +from watcher.decision_engine.audit import base +from watcher.objects import action_plan as action_objects +from watcher.objects import audit as audit_objects + +CONF = cfg.CONF + +WATCHER_CONTINUOUS_OPTS = [ + cfg.IntOpt('continuous_audit_interval', + default=10, + help='Interval, in seconds, for checking new created' + 'continuous audit.') +] + +CONF.register_opts(WATCHER_CONTINUOUS_OPTS, 'watcher_decision_engine') + + +class ContinuousAuditHandler(base.AuditHandler): + def __init__(self, messaging): + super(ContinuousAuditHandler, self).__init__(messaging) + self._scheduler = None + self.jobs = [] + self._start() + self.context_show_deleted = context.RequestContext(is_admin=True, + show_deleted=True) + + @property + def scheduler(self): + if self._scheduler is None: + self._scheduler = background.BackgroundScheduler() + return self._scheduler + + def _is_audit_inactive(self, audit): + audit = audit_objects.Audit.get_by_uuid(self.context_show_deleted, + audit.uuid) + if audit.state in (audit_objects.State.CANCELLED, + audit_objects.State.DELETED, + audit_objects.State.FAILED): + # if audit isn't in active states, audit's job must be removed to + # prevent using of inactive audit in future. + job_to_delete = [job for job in self.jobs + if job.keys()[0] == audit.uuid][0] + self.jobs.remove(job_to_delete) + job_to_delete[audit.uuid].remove() + + return True + + return False + + def do_execute(self, audit, request_context): + # execute the strategy + solution = self.strategy_context.execute_strategy(audit.uuid, + request_context) + + if audit.audit_type == audit_objects.AuditType.CONTINUOUS.value: + a_plan_filters = {'audit_uuid': audit.uuid, + 'state': action_objects.State.RECOMMENDED} + action_plans = action_objects.ActionPlan.list( + request_context, + filters=a_plan_filters) + for plan in action_plans: + plan.state = action_objects.State.CANCELLED + plan.save() + return solution + + def execute_audit(self, audit, request_context): + if not self._is_audit_inactive(audit): + self.execute(audit, request_context) + + def post_execute(self, audit, solution, request_context): + self.planner.schedule(request_context, audit.id, solution) + + def launch_audits_periodically(self): + audit_context = context.RequestContext(is_admin=True) + audit_filters = { + 'audit_type': audit_objects.AuditType.CONTINUOUS.value, + 'state__in': (audit_objects.State.PENDING, + audit_objects.State.ONGOING, + audit_objects.State.SUCCEEDED) + } + audits = audit_objects.Audit.list(audit_context, + filters=audit_filters) + scheduler_job_args = [job.args for job in self.scheduler.get_jobs() + if job.name == 'execute_audit'] + for audit in audits: + if audit.uuid not in [arg[0].uuid for arg in scheduler_job_args]: + job = self.scheduler.add_job( + self.execute_audit, 'interval', + args=[audit, audit_context], + seconds=audit.interval, + name='execute_audit', + next_run_time=datetime.datetime.now()) + self.jobs.append({audit.uuid: job}) + + def _start(self): + self.scheduler.add_job( + self.launch_audits_periodically, + 'interval', + seconds=CONF.watcher_decision_engine.continuous_audit_interval, + next_run_time=datetime.datetime.now()) + self.scheduler.start() diff --git a/watcher/decision_engine/audit/default.py b/watcher/decision_engine/audit/default.py deleted file mode 100644 index 4ea64d179..000000000 --- a/watcher/decision_engine/audit/default.py +++ /dev/null @@ -1,88 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_log import log - -from watcher.common.messaging.events import event as watcher_event -from watcher.decision_engine.audit import base -from watcher.decision_engine.messaging import events as de_events -from watcher.decision_engine.planner import manager as planner_manager -from watcher.decision_engine.strategy.context import default as default_context -from watcher.objects import audit as audit_objects - - -LOG = log.getLogger(__name__) - - -class DefaultAuditHandler(base.BaseAuditHandler): - def __init__(self, messaging): - super(DefaultAuditHandler, self).__init__() - self._messaging = messaging - self._strategy_context = default_context.DefaultStrategyContext() - self._planner_manager = planner_manager.PlannerManager() - self._planner = None - - @property - def planner(self): - if self._planner is None: - self._planner = self._planner_manager.load() - return self._planner - - @property - def messaging(self): - return self._messaging - - @property - def strategy_context(self): - return self._strategy_context - - def notify(self, audit_uuid, event_type, status): - event = watcher_event.Event() - event.type = event_type - event.data = {} - payload = {'audit_uuid': audit_uuid, - 'audit_status': status} - self.messaging.status_topic_handler.publish_event( - event.type.name, payload) - - def update_audit_state(self, request_context, audit_uuid, state): - LOG.debug("Update audit state: %s", state) - audit = audit_objects.Audit.get_by_uuid(request_context, audit_uuid) - audit.state = state - audit.save() - self.notify(audit_uuid, de_events.Events.TRIGGER_AUDIT, state) - return audit - - def execute(self, audit_uuid, request_context): - try: - LOG.debug("Trigger audit %s", audit_uuid) - # change state of the audit to ONGOING - audit = self.update_audit_state(request_context, audit_uuid, - audit_objects.State.ONGOING) - - # execute the strategy - solution = self.strategy_context.execute_strategy(audit_uuid, - request_context) - - self.planner.schedule(request_context, audit.id, solution) - - # change state of the audit to SUCCEEDED - self.update_audit_state(request_context, audit_uuid, - audit_objects.State.SUCCEEDED) - except Exception as e: - LOG.exception(e) - self.update_audit_state(request_context, audit_uuid, - audit_objects.State.FAILED) diff --git a/watcher/decision_engine/audit/oneshot.py b/watcher/decision_engine/audit/oneshot.py new file mode 100644 index 000000000..d7b68d926 --- /dev/null +++ b/watcher/decision_engine/audit/oneshot.py @@ -0,0 +1,26 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from watcher.decision_engine.audit import base + + +class OneShotAuditHandler(base.AuditHandler): + def do_execute(self, audit, request_context): + # execute the strategy + solution = self.strategy_context.execute_strategy(audit.uuid, + request_context) + + return solution diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index ce606b4e2..a5c5f836b 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -21,7 +21,9 @@ from concurrent import futures from oslo_config import cfg from oslo_log import log -from watcher.decision_engine.audit import default +from watcher.decision_engine.audit import continuous as continuous_handler +from watcher.decision_engine.audit import oneshot as oneshot_handler +from watcher.objects import audit as audit_objects CONF = cfg.CONF LOG = log.getLogger(__name__) @@ -32,6 +34,10 @@ class AuditEndpoint(object): self._messaging = messaging self._executor = futures.ThreadPoolExecutor( max_workers=CONF.watcher_decision_engine.max_workers) + self._oneshot_handler = oneshot_handler.OneShotAuditHandler( + self.messaging) + self._continuous_handler = continuous_handler.ContinuousAuditHandler( + self.messaging) @property def executor(self): @@ -42,8 +48,8 @@ class AuditEndpoint(object): return self._messaging def do_trigger_audit(self, context, audit_uuid): - audit = default.DefaultAuditHandler(self.messaging) - audit.execute(audit_uuid, context) + audit = audit_objects.Audit.get_by_uuid(context, audit_uuid) + self._oneshot_handler.execute(audit, context) def trigger_audit(self, context, audit_uuid): LOG.debug("Trigger audit %s" % audit_uuid) diff --git a/watcher/objects/audit.py b/watcher/objects/audit.py index 422d93519..ff04570ed 100644 --- a/watcher/objects/audit.py +++ b/watcher/objects/audit.py @@ -86,6 +86,7 @@ class Audit(base.WatcherObject): 'deadline': obj_utils.datetime_or_str_or_none, 'audit_template_id': obj_utils.int_or_none, 'parameters': obj_utils.dict_or_none, + 'interval': obj_utils.int_or_none, } @staticmethod diff --git a/watcher/tests/api/v1/test_audits.py b/watcher/tests/api/v1/test_audits.py index e8f8b99fd..f1135281e 100644 --- a/watcher/tests/api/v1/test_audits.py +++ b/watcher/tests/api/v1/test_audits.py @@ -477,6 +477,7 @@ class TestPost(api_base.FunctionalTest): audit_dict = post_get_test_audit(state=objects.audit.State.PENDING) del audit_dict['uuid'] del audit_dict['state'] + del audit_dict['interval'] response = self.post_json('/audits', audit_dict) self.assertEqual('application/json', response.content_type) @@ -517,6 +518,7 @@ class TestPost(api_base.FunctionalTest): audit_dict = post_get_test_audit() del audit_dict['uuid'] del audit_dict['state'] + del audit_dict['interval'] # Make the audit template UUID some garbage value audit_dict['audit_template_uuid'] = ( '01234567-8910-1112-1314-151617181920') @@ -537,6 +539,7 @@ class TestPost(api_base.FunctionalTest): state = audit_dict['state'] del audit_dict['uuid'] del audit_dict['state'] + del audit_dict['interval'] with mock.patch.object(self.dbapi, 'create_audit', wraps=self.dbapi.create_audit) as cn_mock: response = self.post_json('/audits', audit_dict) @@ -552,6 +555,7 @@ class TestPost(api_base.FunctionalTest): audit_dict = post_get_test_audit() del audit_dict['uuid'] del audit_dict['state'] + del audit_dict['interval'] response = self.post_json('/audits', audit_dict) self.assertEqual('application/json', response.content_type) @@ -560,12 +564,66 @@ class TestPost(api_base.FunctionalTest): response.json['state']) self.assertTrue(utils.is_uuid_like(response.json['uuid'])) + @mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit') + def test_create_continuous_audit_with_period(self, mock_trigger_audit): + mock_trigger_audit.return_value = mock.ANY + + audit_dict = post_get_test_audit() + del audit_dict['uuid'] + del audit_dict['state'] + audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value + audit_dict['interval'] = 1200 + + response = self.post_json('/audits', audit_dict) + self.assertEqual('application/json', response.content_type) + self.assertEqual(201, response.status_int) + self.assertEqual(objects.audit.State.PENDING, + response.json['state']) + self.assertEqual(audit_dict['interval'], response.json['interval']) + self.assertTrue(utils.is_uuid_like(response.json['uuid'])) + + @mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit') + def test_create_continuous_audit_without_period(self, mock_trigger_audit): + mock_trigger_audit.return_value = mock.ANY + + audit_dict = post_get_test_audit() + del audit_dict['uuid'] + del audit_dict['state'] + audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value + del audit_dict['interval'] + + response = self.post_json('/audits', audit_dict, expect_errors=True) + self.assertEqual(400, response.status_int) + self.assertEqual('application/json', response.content_type) + expected_error_msg = ('Interval of audit must be specified ' + 'for CONTINUOUS.') + self.assertTrue(response.json['error_message']) + self.assertTrue(expected_error_msg in response.json['error_message']) + + @mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit') + def test_create_oneshot_audit_with_period(self, mock_trigger_audit): + mock_trigger_audit.return_value = mock.ANY + + audit_dict = post_get_test_audit() + del audit_dict['uuid'] + del audit_dict['state'] + audit_dict['audit_type'] = objects.audit.AuditType.ONESHOT.value + audit_dict['interval'] = 1200 + + response = self.post_json('/audits', audit_dict, expect_errors=True) + self.assertEqual(400, response.status_int) + self.assertEqual('application/json', response.content_type) + expected_error_msg = 'Interval of audit must not be set for ONESHOT.' + self.assertTrue(response.json['error_message']) + self.assertTrue(expected_error_msg in response.json['error_message']) + def test_create_audit_trigger_decision_engine(self): with mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit') as de_mock: audit_dict = post_get_test_audit(state=objects.audit.State.PENDING) del audit_dict['uuid'] del audit_dict['state'] + del audit_dict['interval'] response = self.post_json('/audits', audit_dict) de_mock.assert_called_once_with(mock.ANY, response.json['uuid']) @@ -586,6 +644,7 @@ class TestPost(api_base.FunctionalTest): audit_dict = post_get_test_audit(parameters={'name': 'Tom'}) del audit_dict['uuid'] del audit_dict['state'] + del audit_dict['interval'] response = self.post_json('/audits', audit_dict, expect_errors=True) self.assertEqual('application/json', response.content_type) @@ -605,6 +664,7 @@ class TestPost(api_base.FunctionalTest): parameters={'name': 'Tom'}) del audit_dict['uuid'] del audit_dict['state'] + del audit_dict['interval'] response = self.post_json('/audits', audit_dict, expect_errors=True) self.assertEqual('application/json', response.content_type) diff --git a/watcher/tests/db/utils.py b/watcher/tests/db/utils.py index 0d0c43857..881e4154a 100644 --- a/watcher/tests/db/utils.py +++ b/watcher/tests/db/utils.py @@ -62,6 +62,7 @@ def get_test_audit(**kwargs): 'updated_at': kwargs.get('updated_at'), 'deleted_at': kwargs.get('deleted_at'), 'parameters': kwargs.get('parameters', {}), + 'interval': kwargs.get('period', 3600), } diff --git a/watcher/tests/decision_engine/audit/test_audit_handlers.py b/watcher/tests/decision_engine/audit/test_audit_handlers.py new file mode 100644 index 000000000..590a98a01 --- /dev/null +++ b/watcher/tests/decision_engine/audit/test_audit_handlers.py @@ -0,0 +1,139 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import mock +import uuid + +from apscheduler.schedulers import background + +from watcher.decision_engine.audit import continuous +from watcher.decision_engine.audit import oneshot +from watcher.decision_engine.messaging import events +from watcher.metrics_engine.cluster_model_collector import manager +from watcher.objects import audit as audit_objects +from watcher.tests.db import base +from watcher.tests.decision_engine.strategy.strategies import \ + faker_cluster_state as faker +from watcher.tests.objects import utils as obj_utils + + +class TestOneShotAuditHandler(base.DbTestCase): + def setUp(self): + super(TestOneShotAuditHandler, self).setUp() + obj_utils.create_test_goal(self.context, id=1, name="dummy") + audit_template = obj_utils.create_test_audit_template( + self.context) + self.audit = obj_utils.create_test_audit( + self.context, + audit_template_id=audit_template.id) + + @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") + def test_trigger_audit_without_errors(self, mock_collector): + mock_collector.return_value = faker.FakerModelCollector() + audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock()) + audit_handler.execute(self.audit, self.context) + + @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") + def test_trigger_audit_state_succeeded(self, mock_collector): + mock_collector.return_value = faker.FakerModelCollector() + audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock()) + audit_handler.execute(self.audit, self.context) + audit = audit_objects.Audit.get_by_uuid(self.context, self.audit.uuid) + self.assertEqual(audit_objects.State.SUCCEEDED, audit.state) + + @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") + def test_trigger_audit_send_notification(self, mock_collector): + messaging = mock.MagicMock() + mock_collector.return_value = faker.FakerModelCollector() + audit_handler = oneshot.OneShotAuditHandler(messaging) + audit_handler.execute(self.audit, self.context) + + call_on_going = mock.call(events.Events.TRIGGER_AUDIT.name, { + 'audit_status': audit_objects.State.ONGOING, + 'audit_uuid': self.audit.uuid}) + call_succeeded = mock.call(events.Events.TRIGGER_AUDIT.name, { + 'audit_status': audit_objects.State.SUCCEEDED, + 'audit_uuid': self.audit.uuid}) + + calls = [call_on_going, call_succeeded] + messaging.status_topic_handler.publish_event.assert_has_calls(calls) + self.assertEqual( + 2, messaging.status_topic_handler.publish_event.call_count) + + +class TestContinuousAuditHandler(base.DbTestCase): + def setUp(self): + super(TestContinuousAuditHandler, self).setUp() + obj_utils.create_test_goal(self.context, id=1, name="DUMMY") + audit_template = obj_utils.create_test_audit_template( + self.context) + self.audits = [obj_utils.create_test_audit( + self.context, + uuid=uuid.uuid4(), + audit_template_id=audit_template.id, + audit_type=audit_objects.AuditType.CONTINUOUS.value) + for i in range(2)] + + @mock.patch.object(background.BackgroundScheduler, 'add_job') + @mock.patch.object(background.BackgroundScheduler, 'get_jobs') + @mock.patch.object(audit_objects.Audit, 'list') + def test_launch_audits_periodically(self, mock_list, + mock_jobs, mock_add_job): + audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock()) + audits = [audit_objects.Audit.get_by_uuid(self.context, + self.audits[0].uuid)] + mock_list.return_value = audits + mock_jobs.return_value = mock.MagicMock() + audit_handler.launch_audits_periodically() + mock_add_job.assert_called() + + @mock.patch.object(background.BackgroundScheduler, 'add_job') + @mock.patch.object(background.BackgroundScheduler, 'get_jobs') + @mock.patch.object(audit_objects.Audit, 'list') + def test_launch_multiply_audits_periodically(self, mock_list, + mock_jobs, mock_add_job): + audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock()) + audits = [audit_objects.Audit.get_by_uuid( + self.context, + audit.uuid) for audit in self.audits] + mock_list.return_value = audits + mock_jobs.return_value = mock.MagicMock() + calls = [mock.call(audit_handler.execute_audit, 'interval', + args=[mock.ANY, mock.ANY], + seconds=3600, + name='execute_audit', + next_run_time=mock.ANY) for audit in self.audits] + audit_handler.launch_audits_periodically() + mock_add_job.assert_has_calls(calls) + + @mock.patch.object(background.BackgroundScheduler, 'add_job') + @mock.patch.object(background.BackgroundScheduler, 'get_jobs') + @mock.patch.object(audit_objects.Audit, 'list') + def test_period_audit_not_called_when_deleted(self, mock_list, + mock_jobs, mock_add_job): + audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock()) + audits = [audit_objects.Audit.get_by_uuid( + self.context, + audit.uuid) for audit in self.audits] + mock_list.return_value = audits + mock_jobs.return_value = mock.MagicMock() + audits[1].state = audit_objects.State.CANCELLED + calls = [mock.call(audit_handler.execute_audit, 'interval', + args=[mock.ANY, mock.ANY], + seconds=3600, + name='execute_audit', + next_run_time=mock.ANY)] + audit_handler.launch_audits_periodically() + mock_add_job.assert_has_calls(calls) diff --git a/watcher/tests/decision_engine/audit/test_default_audit_handler.py b/watcher/tests/decision_engine/audit/test_default_audit_handler.py deleted file mode 100644 index 005849822..000000000 --- a/watcher/tests/decision_engine/audit/test_default_audit_handler.py +++ /dev/null @@ -1,69 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import mock - -from watcher.decision_engine.audit import default as default -from watcher.decision_engine.messaging import events -from watcher.metrics_engine.cluster_model_collector import manager -from watcher.objects import audit as audit_objects -from watcher.tests.db import base -from watcher.tests.decision_engine.strategy.strategies import \ - faker_cluster_state as faker -from watcher.tests.objects import utils as obj_utils - - -class TestDefaultAuditHandler(base.DbTestCase): - def setUp(self): - super(TestDefaultAuditHandler, self).setUp() - obj_utils.create_test_goal(self.context, id=1, name="dummy") - audit_template = obj_utils.create_test_audit_template( - self.context) - self.audit = obj_utils.create_test_audit( - self.context, - audit_template_id=audit_template.id) - - @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") - def test_trigger_audit_without_errors(self, mock_collector): - mock_collector.return_value = faker.FakerModelCollector() - audit_handler = default.DefaultAuditHandler(mock.MagicMock()) - audit_handler.execute(self.audit.uuid, self.context) - - @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") - def test_trigger_audit_state_succeeded(self, mock_collector): - mock_collector.return_value = faker.FakerModelCollector() - audit_handler = default.DefaultAuditHandler(mock.MagicMock()) - audit_handler.execute(self.audit.uuid, self.context) - audit = audit_objects.Audit.get_by_uuid(self.context, self.audit.uuid) - self.assertEqual(audit_objects.State.SUCCEEDED, audit.state) - - @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") - def test_trigger_audit_send_notification(self, mock_collector): - messaging = mock.MagicMock() - mock_collector.return_value = faker.FakerModelCollector() - audit_handler = default.DefaultAuditHandler(messaging) - audit_handler.execute(self.audit.uuid, self.context) - - call_on_going = mock.call(events.Events.TRIGGER_AUDIT.name, { - 'audit_status': audit_objects.State.ONGOING, - 'audit_uuid': self.audit.uuid}) - call_succeeded = mock.call(events.Events.TRIGGER_AUDIT.name, { - 'audit_status': audit_objects.State.SUCCEEDED, - 'audit_uuid': self.audit.uuid}) - - calls = [call_on_going, call_succeeded] - messaging.status_topic_handler.publish_event.assert_has_calls(calls) - self.assertEqual( - 2, messaging.status_topic_handler.publish_event.call_count) diff --git a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py index 118c3a63c..6f5b1d6fc 100644 --- a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py +++ b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py @@ -16,8 +16,7 @@ import mock -from watcher.common import utils -from watcher.decision_engine.audit import default +from watcher.decision_engine.audit import oneshot as oneshot_handler from watcher.decision_engine.messaging import audit_endpoint from watcher.metrics_engine.cluster_model_collector import manager from watcher.tests.db import base @@ -38,28 +37,29 @@ class TestAuditEndpoint(base.DbTestCase): @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") def test_do_trigger_audit(self, mock_collector): mock_collector.return_value = faker_cluster_state.FakerModelCollector() - audit_uuid = utils.generate_uuid() - audit_handler = default.DefaultAuditHandler(mock.MagicMock()) + audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock()) endpoint = audit_endpoint.AuditEndpoint(audit_handler) - with mock.patch.object(default.DefaultAuditHandler, + with mock.patch.object(oneshot_handler.OneShotAuditHandler, 'execute') as mock_call: mock_call.return_value = 0 - endpoint.do_trigger_audit(audit_handler, audit_uuid) + endpoint.do_trigger_audit(self.context, self.audit.uuid) - mock_call.assert_called_once_with(audit_uuid, audit_handler) + self.assertEqual(mock_call.call_count, 1) @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") def test_trigger_audit(self, mock_collector): mock_collector.return_value = faker_cluster_state.FakerModelCollector() - audit_uuid = utils.generate_uuid() - audit_handler = default.DefaultAuditHandler(mock.MagicMock()) + + audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock()) endpoint = audit_endpoint.AuditEndpoint(audit_handler) - with mock.patch.object(default.DefaultAuditHandler, 'execute') \ - as mock_call: - mock_call.return_value = 0 - endpoint.trigger_audit(audit_handler, audit_uuid) + with mock.patch.object(endpoint.executor, 'submit') as mock_call: + mock_execute = mock.call(endpoint.do_trigger_audit, + self.context, + self.audit.uuid) + endpoint.trigger_audit(self.context, self.audit.uuid) - mock_call.assert_called_once_with(audit_uuid, audit_handler) + mock_call.assert_has_calls([mock_execute]) + self.assertEqual(mock_call.call_count, 1)