Add continuously optimization
This patch set adds implementation for CONTINUOUS type of audit. Change-Id: I5f4ec97b2082c8a6b3ccebe36b2a343fa4a67d19 Implements: blueprint continuously-optimization
This commit is contained in:
parent
518b4c82f1
commit
1de00086f5
@ -261,6 +261,13 @@ previously created :ref:`Audit template <audit_template_definition>`:
|
||||
.. image:: ./images/sequence_create_and_launch_audit.png
|
||||
:width: 100%
|
||||
|
||||
The :ref:`Administrator <administrator_definition>` also can specify type of
|
||||
Audit and interval (in case of CONTINUOUS type). There is two types of Audit:
|
||||
ONESHOT and CONTINUOUS. Oneshot Audit is launched once and if it succeeded
|
||||
executed new action plan list will be provided. Continuous Audit creates
|
||||
action plans with specified interval (in seconds); if action plan
|
||||
has been created, all previous action plans get CANCELLED state.
|
||||
|
||||
A message is sent on the :ref:`AMQP bus <amqp_bus_definition>` which triggers
|
||||
the Audit in the
|
||||
:ref:`Watcher Decision Engine <watcher_decision_engine_definition>`:
|
||||
|
@ -54,9 +54,10 @@ table(audit) {
|
||||
primary_key(id: Integer)
|
||||
foreign_key("audit_template_id : Integer")
|
||||
uuid : String[36]
|
||||
type : String[20]
|
||||
audit_type : String[20]
|
||||
state : String[20], nullable
|
||||
deadline :DateTime, nullable
|
||||
interval : Integer, nullable
|
||||
|
||||
created_at : DateTime
|
||||
updated_at : DateTime
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 49 KiB After Width: | Height: | Size: 64 KiB |
@ -2,6 +2,7 @@
|
||||
# of appearance. Changing the order has an impact on the overall integration
|
||||
# process, which may cause wedges in the gate later.
|
||||
|
||||
apscheduler # MIT License
|
||||
enum34;python_version=='2.7' or python_version=='2.6' or python_version=='3.3' # BSD
|
||||
jsonpatch>=1.1 # BSD
|
||||
keystoneauth1>=2.7.0 # Apache-2.0
|
||||
|
@ -63,25 +63,45 @@ class AuditPostType(wtypes.Base):
|
||||
|
||||
parameters = wtypes.wsattr({wtypes.text: types.jsontype}, mandatory=False,
|
||||
default={})
|
||||
interval = wsme.wsattr(int, mandatory=False)
|
||||
|
||||
def as_audit(self):
|
||||
audit_type_values = [val.value for val in objects.audit.AuditType]
|
||||
if self.audit_type not in audit_type_values:
|
||||
raise exception.AuditTypeNotFound(audit_type=self.audit_type)
|
||||
|
||||
if (self.audit_type == objects.audit.AuditType.ONESHOT.value and
|
||||
self.interval != wtypes.Unset):
|
||||
raise exception.AuditIntervalNotAllowed(audit_type=self.audit_type)
|
||||
|
||||
if (self.audit_type == objects.audit.AuditType.CONTINUOUS.value and
|
||||
self.interval == wtypes.Unset):
|
||||
raise exception.AuditIntervalNotSpecified(
|
||||
audit_type=self.audit_type)
|
||||
|
||||
return Audit(
|
||||
audit_template_id=self.audit_template_uuid,
|
||||
audit_type=self.audit_type,
|
||||
deadline=self.deadline,
|
||||
parameters=self.parameters,
|
||||
)
|
||||
interval=self.interval)
|
||||
|
||||
|
||||
class AuditPatchType(types.JsonPatchType):
|
||||
|
||||
@staticmethod
|
||||
def mandatory_attrs():
|
||||
return ['/audit_template_uuid']
|
||||
return ['/audit_template_uuid', '/type']
|
||||
|
||||
@staticmethod
|
||||
def validate(patch):
|
||||
serialized_patch = {'path': patch.path, 'op': patch.op}
|
||||
if patch.path in AuditPatchType.mandatory_attrs():
|
||||
msg = _("%(field)s can't be updated.")
|
||||
raise exception.PatchError(
|
||||
patch=serialized_patch,
|
||||
reason=msg % dict(field=patch.path))
|
||||
return types.JsonPatchType.validate(patch)
|
||||
|
||||
|
||||
class Audit(base.APIBase):
|
||||
@ -160,6 +180,9 @@ class Audit(base.APIBase):
|
||||
links = wsme.wsattr([link.Link], readonly=True)
|
||||
"""A list containing a self link and associated audit links"""
|
||||
|
||||
interval = wsme.wsattr(int, mandatory=False)
|
||||
"""Launch audit periodically (in seconds)"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.fields = []
|
||||
fields = list(objects.Audit.fields)
|
||||
@ -187,7 +210,7 @@ class Audit(base.APIBase):
|
||||
if not expand:
|
||||
audit.unset_fields_except(['uuid', 'audit_type', 'deadline',
|
||||
'state', 'audit_template_uuid',
|
||||
'audit_template_name'])
|
||||
'audit_template_name', 'interval'])
|
||||
|
||||
# The numeric ID should not be exposed to
|
||||
# the user, it's internal only.
|
||||
@ -215,7 +238,8 @@ class Audit(base.APIBase):
|
||||
deadline=None,
|
||||
created_at=datetime.datetime.utcnow(),
|
||||
deleted_at=None,
|
||||
updated_at=datetime.datetime.utcnow())
|
||||
updated_at=datetime.datetime.utcnow(),
|
||||
interval=7200)
|
||||
sample._audit_template_uuid = '7ae81bb3-dec3-4289-8d6c-da80bd8001ae'
|
||||
return cls._convert_with_links(sample, 'http://localhost:9322', expand)
|
||||
|
||||
@ -414,8 +438,9 @@ class AuditsController(rest.RestController):
|
||||
|
||||
# trigger decision-engine to run the audit
|
||||
|
||||
dc_client = rpcapi.DecisionEngineAPI()
|
||||
dc_client.trigger_audit(context, new_audit.uuid)
|
||||
if new_audit.audit_type == objects.audit.AuditType.ONESHOT.value:
|
||||
dc_client = rpcapi.DecisionEngineAPI()
|
||||
dc_client.trigger_audit(context, new_audit.uuid)
|
||||
|
||||
return Audit.convert_with_links(new_audit)
|
||||
|
||||
|
@ -221,6 +221,14 @@ class AuditAlreadyExists(Conflict):
|
||||
msg_fmt = _("An audit with UUID %(uuid)s already exists")
|
||||
|
||||
|
||||
class AuditIntervalNotSpecified(Invalid):
|
||||
msg_fmt = _("Interval of audit must be specified for %(audit_type)s.")
|
||||
|
||||
|
||||
class AuditIntervalNotAllowed(Invalid):
|
||||
msg_fmt = _("Interval of audit must not be set for %(audit_type)s.")
|
||||
|
||||
|
||||
class AuditReferenced(Invalid):
|
||||
msg_fmt = _("Audit %(audit)s is referenced by one or multiple action "
|
||||
"plans")
|
||||
|
@ -177,6 +177,7 @@ class Audit(Base):
|
||||
audit_template_id = Column(Integer, ForeignKey('audit_templates.id'),
|
||||
nullable=False)
|
||||
parameters = Column(JSONEncodedDict, nullable=True)
|
||||
interval = Column(Integer, nullable=True)
|
||||
|
||||
|
||||
class Action(Base):
|
||||
|
@ -2,6 +2,7 @@
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
# Alexander Chadin <a.chadin@servionica.ru>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -19,9 +20,92 @@
|
||||
import abc
|
||||
import six
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.common.messaging.events import event as watcher_event
|
||||
from watcher.decision_engine.messaging import events as de_events
|
||||
from watcher.decision_engine.planner import manager as planner_manager
|
||||
from watcher.decision_engine.strategy.context import default as default_context
|
||||
from watcher.objects import audit as audit_objects
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseAuditHandler(object):
|
||||
@abc.abstractmethod
|
||||
def execute(self, audit_uuid, request_context):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def pre_execute(self, audit_uuid, request_context):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def do_execute(self, audit, request_context):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def post_execute(self, audit, solution, request_context):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AuditHandler(BaseAuditHandler):
|
||||
def __init__(self, messaging):
|
||||
self._messaging = messaging
|
||||
self._strategy_context = default_context.DefaultStrategyContext()
|
||||
self._planner_manager = planner_manager.PlannerManager()
|
||||
self._planner = None
|
||||
|
||||
@property
|
||||
def planner(self):
|
||||
if self._planner is None:
|
||||
self._planner = self._planner_manager.load()
|
||||
return self._planner
|
||||
|
||||
@property
|
||||
def messaging(self):
|
||||
return self._messaging
|
||||
|
||||
@property
|
||||
def strategy_context(self):
|
||||
return self._strategy_context
|
||||
|
||||
def notify(self, audit_uuid, event_type, status):
|
||||
event = watcher_event.Event()
|
||||
event.type = event_type
|
||||
event.data = {}
|
||||
payload = {'audit_uuid': audit_uuid,
|
||||
'audit_status': status}
|
||||
self.messaging.status_topic_handler.publish_event(
|
||||
event.type.name, payload)
|
||||
|
||||
def update_audit_state(self, request_context, audit, state):
|
||||
LOG.debug("Update audit state: %s", state)
|
||||
audit.state = state
|
||||
audit.save()
|
||||
self.notify(audit.uuid, de_events.Events.TRIGGER_AUDIT, state)
|
||||
|
||||
def pre_execute(self, audit, request_context):
|
||||
LOG.debug("Trigger audit %s", audit.uuid)
|
||||
# change state of the audit to ONGOING
|
||||
self.update_audit_state(request_context, audit,
|
||||
audit_objects.State.ONGOING)
|
||||
|
||||
def post_execute(self, audit, solution, request_context):
|
||||
self.planner.schedule(request_context, audit.id, solution)
|
||||
|
||||
# change state of the audit to SUCCEEDED
|
||||
self.update_audit_state(request_context, audit,
|
||||
audit_objects.State.SUCCEEDED)
|
||||
|
||||
def execute(self, audit, request_context):
|
||||
try:
|
||||
self.pre_execute(audit, request_context)
|
||||
solution = self.do_execute(audit, request_context)
|
||||
self.post_execute(audit, solution, request_context)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
self.update_audit_state(request_context, audit,
|
||||
audit_objects.State.FAILED)
|
||||
|
126
watcher/decision_engine/audit/continuous.py
Normal file
126
watcher/decision_engine/audit/continuous.py
Normal file
@ -0,0 +1,126 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2016 Servionica LTD
|
||||
#
|
||||
# Authors: Alexander Chadin <a.chadin@servionica.ru>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import datetime
|
||||
|
||||
from apscheduler.schedulers import background
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from watcher.common import context
|
||||
from watcher.decision_engine.audit import base
|
||||
from watcher.objects import action_plan as action_objects
|
||||
from watcher.objects import audit as audit_objects
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
WATCHER_CONTINUOUS_OPTS = [
|
||||
cfg.IntOpt('continuous_audit_interval',
|
||||
default=10,
|
||||
help='Interval, in seconds, for checking new created'
|
||||
'continuous audit.')
|
||||
]
|
||||
|
||||
CONF.register_opts(WATCHER_CONTINUOUS_OPTS, 'watcher_decision_engine')
|
||||
|
||||
|
||||
class ContinuousAuditHandler(base.AuditHandler):
|
||||
def __init__(self, messaging):
|
||||
super(ContinuousAuditHandler, self).__init__(messaging)
|
||||
self._scheduler = None
|
||||
self.jobs = []
|
||||
self._start()
|
||||
self.context_show_deleted = context.RequestContext(is_admin=True,
|
||||
show_deleted=True)
|
||||
|
||||
@property
|
||||
def scheduler(self):
|
||||
if self._scheduler is None:
|
||||
self._scheduler = background.BackgroundScheduler()
|
||||
return self._scheduler
|
||||
|
||||
def _is_audit_inactive(self, audit):
|
||||
audit = audit_objects.Audit.get_by_uuid(self.context_show_deleted,
|
||||
audit.uuid)
|
||||
if audit.state in (audit_objects.State.CANCELLED,
|
||||
audit_objects.State.DELETED,
|
||||
audit_objects.State.FAILED):
|
||||
# if audit isn't in active states, audit's job must be removed to
|
||||
# prevent using of inactive audit in future.
|
||||
job_to_delete = [job for job in self.jobs
|
||||
if job.keys()[0] == audit.uuid][0]
|
||||
self.jobs.remove(job_to_delete)
|
||||
job_to_delete[audit.uuid].remove()
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def do_execute(self, audit, request_context):
|
||||
# execute the strategy
|
||||
solution = self.strategy_context.execute_strategy(audit.uuid,
|
||||
request_context)
|
||||
|
||||
if audit.audit_type == audit_objects.AuditType.CONTINUOUS.value:
|
||||
a_plan_filters = {'audit_uuid': audit.uuid,
|
||||
'state': action_objects.State.RECOMMENDED}
|
||||
action_plans = action_objects.ActionPlan.list(
|
||||
request_context,
|
||||
filters=a_plan_filters)
|
||||
for plan in action_plans:
|
||||
plan.state = action_objects.State.CANCELLED
|
||||
plan.save()
|
||||
return solution
|
||||
|
||||
def execute_audit(self, audit, request_context):
|
||||
if not self._is_audit_inactive(audit):
|
||||
self.execute(audit, request_context)
|
||||
|
||||
def post_execute(self, audit, solution, request_context):
|
||||
self.planner.schedule(request_context, audit.id, solution)
|
||||
|
||||
def launch_audits_periodically(self):
|
||||
audit_context = context.RequestContext(is_admin=True)
|
||||
audit_filters = {
|
||||
'audit_type': audit_objects.AuditType.CONTINUOUS.value,
|
||||
'state__in': (audit_objects.State.PENDING,
|
||||
audit_objects.State.ONGOING,
|
||||
audit_objects.State.SUCCEEDED)
|
||||
}
|
||||
audits = audit_objects.Audit.list(audit_context,
|
||||
filters=audit_filters)
|
||||
scheduler_job_args = [job.args for job in self.scheduler.get_jobs()
|
||||
if job.name == 'execute_audit']
|
||||
for audit in audits:
|
||||
if audit.uuid not in [arg[0].uuid for arg in scheduler_job_args]:
|
||||
job = self.scheduler.add_job(
|
||||
self.execute_audit, 'interval',
|
||||
args=[audit, audit_context],
|
||||
seconds=audit.interval,
|
||||
name='execute_audit',
|
||||
next_run_time=datetime.datetime.now())
|
||||
self.jobs.append({audit.uuid: job})
|
||||
|
||||
def _start(self):
|
||||
self.scheduler.add_job(
|
||||
self.launch_audits_periodically,
|
||||
'interval',
|
||||
seconds=CONF.watcher_decision_engine.continuous_audit_interval,
|
||||
next_run_time=datetime.datetime.now())
|
||||
self.scheduler.start()
|
@ -1,88 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.common.messaging.events import event as watcher_event
|
||||
from watcher.decision_engine.audit import base
|
||||
from watcher.decision_engine.messaging import events as de_events
|
||||
from watcher.decision_engine.planner import manager as planner_manager
|
||||
from watcher.decision_engine.strategy.context import default as default_context
|
||||
from watcher.objects import audit as audit_objects
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultAuditHandler(base.BaseAuditHandler):
|
||||
def __init__(self, messaging):
|
||||
super(DefaultAuditHandler, self).__init__()
|
||||
self._messaging = messaging
|
||||
self._strategy_context = default_context.DefaultStrategyContext()
|
||||
self._planner_manager = planner_manager.PlannerManager()
|
||||
self._planner = None
|
||||
|
||||
@property
|
||||
def planner(self):
|
||||
if self._planner is None:
|
||||
self._planner = self._planner_manager.load()
|
||||
return self._planner
|
||||
|
||||
@property
|
||||
def messaging(self):
|
||||
return self._messaging
|
||||
|
||||
@property
|
||||
def strategy_context(self):
|
||||
return self._strategy_context
|
||||
|
||||
def notify(self, audit_uuid, event_type, status):
|
||||
event = watcher_event.Event()
|
||||
event.type = event_type
|
||||
event.data = {}
|
||||
payload = {'audit_uuid': audit_uuid,
|
||||
'audit_status': status}
|
||||
self.messaging.status_topic_handler.publish_event(
|
||||
event.type.name, payload)
|
||||
|
||||
def update_audit_state(self, request_context, audit_uuid, state):
|
||||
LOG.debug("Update audit state: %s", state)
|
||||
audit = audit_objects.Audit.get_by_uuid(request_context, audit_uuid)
|
||||
audit.state = state
|
||||
audit.save()
|
||||
self.notify(audit_uuid, de_events.Events.TRIGGER_AUDIT, state)
|
||||
return audit
|
||||
|
||||
def execute(self, audit_uuid, request_context):
|
||||
try:
|
||||
LOG.debug("Trigger audit %s", audit_uuid)
|
||||
# change state of the audit to ONGOING
|
||||
audit = self.update_audit_state(request_context, audit_uuid,
|
||||
audit_objects.State.ONGOING)
|
||||
|
||||
# execute the strategy
|
||||
solution = self.strategy_context.execute_strategy(audit_uuid,
|
||||
request_context)
|
||||
|
||||
self.planner.schedule(request_context, audit.id, solution)
|
||||
|
||||
# change state of the audit to SUCCEEDED
|
||||
self.update_audit_state(request_context, audit_uuid,
|
||||
audit_objects.State.SUCCEEDED)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
self.update_audit_state(request_context, audit_uuid,
|
||||
audit_objects.State.FAILED)
|
26
watcher/decision_engine/audit/oneshot.py
Normal file
26
watcher/decision_engine/audit/oneshot.py
Normal file
@ -0,0 +1,26 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from watcher.decision_engine.audit import base
|
||||
|
||||
|
||||
class OneShotAuditHandler(base.AuditHandler):
|
||||
def do_execute(self, audit, request_context):
|
||||
# execute the strategy
|
||||
solution = self.strategy_context.execute_strategy(audit.uuid,
|
||||
request_context)
|
||||
|
||||
return solution
|
@ -21,7 +21,9 @@ from concurrent import futures
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.decision_engine.audit import default
|
||||
from watcher.decision_engine.audit import continuous as continuous_handler
|
||||
from watcher.decision_engine.audit import oneshot as oneshot_handler
|
||||
from watcher.objects import audit as audit_objects
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -32,6 +34,10 @@ class AuditEndpoint(object):
|
||||
self._messaging = messaging
|
||||
self._executor = futures.ThreadPoolExecutor(
|
||||
max_workers=CONF.watcher_decision_engine.max_workers)
|
||||
self._oneshot_handler = oneshot_handler.OneShotAuditHandler(
|
||||
self.messaging)
|
||||
self._continuous_handler = continuous_handler.ContinuousAuditHandler(
|
||||
self.messaging)
|
||||
|
||||
@property
|
||||
def executor(self):
|
||||
@ -42,8 +48,8 @@ class AuditEndpoint(object):
|
||||
return self._messaging
|
||||
|
||||
def do_trigger_audit(self, context, audit_uuid):
|
||||
audit = default.DefaultAuditHandler(self.messaging)
|
||||
audit.execute(audit_uuid, context)
|
||||
audit = audit_objects.Audit.get_by_uuid(context, audit_uuid)
|
||||
self._oneshot_handler.execute(audit, context)
|
||||
|
||||
def trigger_audit(self, context, audit_uuid):
|
||||
LOG.debug("Trigger audit %s" % audit_uuid)
|
||||
|
@ -86,6 +86,7 @@ class Audit(base.WatcherObject):
|
||||
'deadline': obj_utils.datetime_or_str_or_none,
|
||||
'audit_template_id': obj_utils.int_or_none,
|
||||
'parameters': obj_utils.dict_or_none,
|
||||
'interval': obj_utils.int_or_none,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
|
@ -477,6 +477,7 @@ class TestPost(api_base.FunctionalTest):
|
||||
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
del audit_dict['interval']
|
||||
|
||||
response = self.post_json('/audits', audit_dict)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
@ -517,6 +518,7 @@ class TestPost(api_base.FunctionalTest):
|
||||
audit_dict = post_get_test_audit()
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
del audit_dict['interval']
|
||||
# Make the audit template UUID some garbage value
|
||||
audit_dict['audit_template_uuid'] = (
|
||||
'01234567-8910-1112-1314-151617181920')
|
||||
@ -537,6 +539,7 @@ class TestPost(api_base.FunctionalTest):
|
||||
state = audit_dict['state']
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
del audit_dict['interval']
|
||||
with mock.patch.object(self.dbapi, 'create_audit',
|
||||
wraps=self.dbapi.create_audit) as cn_mock:
|
||||
response = self.post_json('/audits', audit_dict)
|
||||
@ -552,6 +555,7 @@ class TestPost(api_base.FunctionalTest):
|
||||
audit_dict = post_get_test_audit()
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
del audit_dict['interval']
|
||||
|
||||
response = self.post_json('/audits', audit_dict)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
@ -560,12 +564,66 @@ class TestPost(api_base.FunctionalTest):
|
||||
response.json['state'])
|
||||
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
|
||||
|
||||
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
|
||||
def test_create_continuous_audit_with_period(self, mock_trigger_audit):
|
||||
mock_trigger_audit.return_value = mock.ANY
|
||||
|
||||
audit_dict = post_get_test_audit()
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
|
||||
audit_dict['interval'] = 1200
|
||||
|
||||
response = self.post_json('/audits', audit_dict)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertEqual(201, response.status_int)
|
||||
self.assertEqual(objects.audit.State.PENDING,
|
||||
response.json['state'])
|
||||
self.assertEqual(audit_dict['interval'], response.json['interval'])
|
||||
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
|
||||
|
||||
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
|
||||
def test_create_continuous_audit_without_period(self, mock_trigger_audit):
|
||||
mock_trigger_audit.return_value = mock.ANY
|
||||
|
||||
audit_dict = post_get_test_audit()
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
|
||||
del audit_dict['interval']
|
||||
|
||||
response = self.post_json('/audits', audit_dict, expect_errors=True)
|
||||
self.assertEqual(400, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
expected_error_msg = ('Interval of audit must be specified '
|
||||
'for CONTINUOUS.')
|
||||
self.assertTrue(response.json['error_message'])
|
||||
self.assertTrue(expected_error_msg in response.json['error_message'])
|
||||
|
||||
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
|
||||
def test_create_oneshot_audit_with_period(self, mock_trigger_audit):
|
||||
mock_trigger_audit.return_value = mock.ANY
|
||||
|
||||
audit_dict = post_get_test_audit()
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
audit_dict['audit_type'] = objects.audit.AuditType.ONESHOT.value
|
||||
audit_dict['interval'] = 1200
|
||||
|
||||
response = self.post_json('/audits', audit_dict, expect_errors=True)
|
||||
self.assertEqual(400, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
expected_error_msg = 'Interval of audit must not be set for ONESHOT.'
|
||||
self.assertTrue(response.json['error_message'])
|
||||
self.assertTrue(expected_error_msg in response.json['error_message'])
|
||||
|
||||
def test_create_audit_trigger_decision_engine(self):
|
||||
with mock.patch.object(deapi.DecisionEngineAPI,
|
||||
'trigger_audit') as de_mock:
|
||||
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
del audit_dict['interval']
|
||||
response = self.post_json('/audits', audit_dict)
|
||||
de_mock.assert_called_once_with(mock.ANY, response.json['uuid'])
|
||||
|
||||
@ -586,6 +644,7 @@ class TestPost(api_base.FunctionalTest):
|
||||
audit_dict = post_get_test_audit(parameters={'name': 'Tom'})
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
del audit_dict['interval']
|
||||
|
||||
response = self.post_json('/audits', audit_dict, expect_errors=True)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
@ -605,6 +664,7 @@ class TestPost(api_base.FunctionalTest):
|
||||
parameters={'name': 'Tom'})
|
||||
del audit_dict['uuid']
|
||||
del audit_dict['state']
|
||||
del audit_dict['interval']
|
||||
|
||||
response = self.post_json('/audits', audit_dict, expect_errors=True)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
|
@ -62,6 +62,7 @@ def get_test_audit(**kwargs):
|
||||
'updated_at': kwargs.get('updated_at'),
|
||||
'deleted_at': kwargs.get('deleted_at'),
|
||||
'parameters': kwargs.get('parameters', {}),
|
||||
'interval': kwargs.get('period', 3600),
|
||||
}
|
||||
|
||||
|
||||
|
139
watcher/tests/decision_engine/audit/test_audit_handlers.py
Normal file
139
watcher/tests/decision_engine/audit/test_audit_handlers.py
Normal file
@ -0,0 +1,139 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import mock
|
||||
import uuid
|
||||
|
||||
from apscheduler.schedulers import background
|
||||
|
||||
from watcher.decision_engine.audit import continuous
|
||||
from watcher.decision_engine.audit import oneshot
|
||||
from watcher.decision_engine.messaging import events
|
||||
from watcher.metrics_engine.cluster_model_collector import manager
|
||||
from watcher.objects import audit as audit_objects
|
||||
from watcher.tests.db import base
|
||||
from watcher.tests.decision_engine.strategy.strategies import \
|
||||
faker_cluster_state as faker
|
||||
from watcher.tests.objects import utils as obj_utils
|
||||
|
||||
|
||||
class TestOneShotAuditHandler(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestOneShotAuditHandler, self).setUp()
|
||||
obj_utils.create_test_goal(self.context, id=1, name="dummy")
|
||||
audit_template = obj_utils.create_test_audit_template(
|
||||
self.context)
|
||||
self.audit = obj_utils.create_test_audit(
|
||||
self.context,
|
||||
audit_template_id=audit_template.id)
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_without_errors(self, mock_collector):
|
||||
mock_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_state_succeeded(self, mock_collector):
|
||||
mock_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
audit = audit_objects.Audit.get_by_uuid(self.context, self.audit.uuid)
|
||||
self.assertEqual(audit_objects.State.SUCCEEDED, audit.state)
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_send_notification(self, mock_collector):
|
||||
messaging = mock.MagicMock()
|
||||
mock_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(messaging)
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
call_on_going = mock.call(events.Events.TRIGGER_AUDIT.name, {
|
||||
'audit_status': audit_objects.State.ONGOING,
|
||||
'audit_uuid': self.audit.uuid})
|
||||
call_succeeded = mock.call(events.Events.TRIGGER_AUDIT.name, {
|
||||
'audit_status': audit_objects.State.SUCCEEDED,
|
||||
'audit_uuid': self.audit.uuid})
|
||||
|
||||
calls = [call_on_going, call_succeeded]
|
||||
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||
self.assertEqual(
|
||||
2, messaging.status_topic_handler.publish_event.call_count)
|
||||
|
||||
|
||||
class TestContinuousAuditHandler(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestContinuousAuditHandler, self).setUp()
|
||||
obj_utils.create_test_goal(self.context, id=1, name="DUMMY")
|
||||
audit_template = obj_utils.create_test_audit_template(
|
||||
self.context)
|
||||
self.audits = [obj_utils.create_test_audit(
|
||||
self.context,
|
||||
uuid=uuid.uuid4(),
|
||||
audit_template_id=audit_template.id,
|
||||
audit_type=audit_objects.AuditType.CONTINUOUS.value)
|
||||
for i in range(2)]
|
||||
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(audit_objects.Audit, 'list')
|
||||
def test_launch_audits_periodically(self, mock_list,
|
||||
mock_jobs, mock_add_job):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
audits = [audit_objects.Audit.get_by_uuid(self.context,
|
||||
self.audits[0].uuid)]
|
||||
mock_list.return_value = audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
audit_handler.launch_audits_periodically()
|
||||
mock_add_job.assert_called()
|
||||
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(audit_objects.Audit, 'list')
|
||||
def test_launch_multiply_audits_periodically(self, mock_list,
|
||||
mock_jobs, mock_add_job):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
audits = [audit_objects.Audit.get_by_uuid(
|
||||
self.context,
|
||||
audit.uuid) for audit in self.audits]
|
||||
mock_list.return_value = audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
calls = [mock.call(audit_handler.execute_audit, 'interval',
|
||||
args=[mock.ANY, mock.ANY],
|
||||
seconds=3600,
|
||||
name='execute_audit',
|
||||
next_run_time=mock.ANY) for audit in self.audits]
|
||||
audit_handler.launch_audits_periodically()
|
||||
mock_add_job.assert_has_calls(calls)
|
||||
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(audit_objects.Audit, 'list')
|
||||
def test_period_audit_not_called_when_deleted(self, mock_list,
|
||||
mock_jobs, mock_add_job):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
audits = [audit_objects.Audit.get_by_uuid(
|
||||
self.context,
|
||||
audit.uuid) for audit in self.audits]
|
||||
mock_list.return_value = audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
audits[1].state = audit_objects.State.CANCELLED
|
||||
calls = [mock.call(audit_handler.execute_audit, 'interval',
|
||||
args=[mock.ANY, mock.ANY],
|
||||
seconds=3600,
|
||||
name='execute_audit',
|
||||
next_run_time=mock.ANY)]
|
||||
audit_handler.launch_audits_periodically()
|
||||
mock_add_job.assert_has_calls(calls)
|
@ -1,69 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import mock
|
||||
|
||||
from watcher.decision_engine.audit import default as default
|
||||
from watcher.decision_engine.messaging import events
|
||||
from watcher.metrics_engine.cluster_model_collector import manager
|
||||
from watcher.objects import audit as audit_objects
|
||||
from watcher.tests.db import base
|
||||
from watcher.tests.decision_engine.strategy.strategies import \
|
||||
faker_cluster_state as faker
|
||||
from watcher.tests.objects import utils as obj_utils
|
||||
|
||||
|
||||
class TestDefaultAuditHandler(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestDefaultAuditHandler, self).setUp()
|
||||
obj_utils.create_test_goal(self.context, id=1, name="dummy")
|
||||
audit_template = obj_utils.create_test_audit_template(
|
||||
self.context)
|
||||
self.audit = obj_utils.create_test_audit(
|
||||
self.context,
|
||||
audit_template_id=audit_template.id)
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_without_errors(self, mock_collector):
|
||||
mock_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = default.DefaultAuditHandler(mock.MagicMock())
|
||||
audit_handler.execute(self.audit.uuid, self.context)
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_state_succeeded(self, mock_collector):
|
||||
mock_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = default.DefaultAuditHandler(mock.MagicMock())
|
||||
audit_handler.execute(self.audit.uuid, self.context)
|
||||
audit = audit_objects.Audit.get_by_uuid(self.context, self.audit.uuid)
|
||||
self.assertEqual(audit_objects.State.SUCCEEDED, audit.state)
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_send_notification(self, mock_collector):
|
||||
messaging = mock.MagicMock()
|
||||
mock_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = default.DefaultAuditHandler(messaging)
|
||||
audit_handler.execute(self.audit.uuid, self.context)
|
||||
|
||||
call_on_going = mock.call(events.Events.TRIGGER_AUDIT.name, {
|
||||
'audit_status': audit_objects.State.ONGOING,
|
||||
'audit_uuid': self.audit.uuid})
|
||||
call_succeeded = mock.call(events.Events.TRIGGER_AUDIT.name, {
|
||||
'audit_status': audit_objects.State.SUCCEEDED,
|
||||
'audit_uuid': self.audit.uuid})
|
||||
|
||||
calls = [call_on_going, call_succeeded]
|
||||
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||
self.assertEqual(
|
||||
2, messaging.status_topic_handler.publish_event.call_count)
|
@ -16,8 +16,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from watcher.common import utils
|
||||
from watcher.decision_engine.audit import default
|
||||
from watcher.decision_engine.audit import oneshot as oneshot_handler
|
||||
from watcher.decision_engine.messaging import audit_endpoint
|
||||
from watcher.metrics_engine.cluster_model_collector import manager
|
||||
from watcher.tests.db import base
|
||||
@ -38,28 +37,29 @@ class TestAuditEndpoint(base.DbTestCase):
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_do_trigger_audit(self, mock_collector):
|
||||
mock_collector.return_value = faker_cluster_state.FakerModelCollector()
|
||||
audit_uuid = utils.generate_uuid()
|
||||
|
||||
audit_handler = default.DefaultAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock())
|
||||
endpoint = audit_endpoint.AuditEndpoint(audit_handler)
|
||||
|
||||
with mock.patch.object(default.DefaultAuditHandler,
|
||||
with mock.patch.object(oneshot_handler.OneShotAuditHandler,
|
||||
'execute') as mock_call:
|
||||
mock_call.return_value = 0
|
||||
endpoint.do_trigger_audit(audit_handler, audit_uuid)
|
||||
endpoint.do_trigger_audit(self.context, self.audit.uuid)
|
||||
|
||||
mock_call.assert_called_once_with(audit_uuid, audit_handler)
|
||||
self.assertEqual(mock_call.call_count, 1)
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit(self, mock_collector):
|
||||
mock_collector.return_value = faker_cluster_state.FakerModelCollector()
|
||||
audit_uuid = utils.generate_uuid()
|
||||
audit_handler = default.DefaultAuditHandler(mock.MagicMock())
|
||||
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock())
|
||||
endpoint = audit_endpoint.AuditEndpoint(audit_handler)
|
||||
|
||||
with mock.patch.object(default.DefaultAuditHandler, 'execute') \
|
||||
as mock_call:
|
||||
mock_call.return_value = 0
|
||||
endpoint.trigger_audit(audit_handler, audit_uuid)
|
||||
with mock.patch.object(endpoint.executor, 'submit') as mock_call:
|
||||
mock_execute = mock.call(endpoint.do_trigger_audit,
|
||||
self.context,
|
||||
self.audit.uuid)
|
||||
endpoint.trigger_audit(self.context, self.audit.uuid)
|
||||
|
||||
mock_call.assert_called_once_with(audit_uuid, audit_handler)
|
||||
mock_call.assert_has_calls([mock_execute])
|
||||
self.assertEqual(mock_call.call_count, 1)
|
||||
|
Loading…
Reference in New Issue
Block a user