Restructure watchrules to make them more testable
Change-Id: Ic8085de3f5692249d82e68462bbed02da787712f Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
This commit is contained in:
parent
a9135324f6
commit
2d4d5529f9
@ -20,10 +20,11 @@ import datetime
|
||||
import logging
|
||||
import webob
|
||||
from heat import manager
|
||||
from heat.db import api as db_api
|
||||
from heat.common import config
|
||||
from heat.engine import parser
|
||||
from heat.engine import resources
|
||||
from heat.db import api as db_api
|
||||
from heat.engine import watchrule
|
||||
from heat.openstack.common import timeutils
|
||||
|
||||
from novaclient.v1_1 import client
|
||||
@ -363,112 +364,61 @@ class EngineManager(manager.Manager):
|
||||
pt.save()
|
||||
return [None, metadata]
|
||||
|
||||
def do_data_cmp(self, rule, data, threshold):
|
||||
op = rule['ComparisonOperator']
|
||||
if op == 'GreaterThanThreshold':
|
||||
return data > threshold
|
||||
elif op == 'GreaterThanOrEqualToThreshold':
|
||||
return data >= threshold
|
||||
elif op == 'LessThanThreshold':
|
||||
return data < threshold
|
||||
elif op == 'LessThanOrEqualToThreshold':
|
||||
return data <= threshold
|
||||
else:
|
||||
return False
|
||||
|
||||
def do_data_calc(self, rule, rolling, metric):
|
||||
|
||||
stat = rule['Statistic']
|
||||
if stat == 'Maximum':
|
||||
if metric > rolling:
|
||||
return metric
|
||||
else:
|
||||
return rolling
|
||||
elif stat == 'Minimum':
|
||||
if metric < rolling:
|
||||
return metric
|
||||
else:
|
||||
return rolling
|
||||
else:
|
||||
return metric + rolling
|
||||
|
||||
@manager.periodic_task
|
||||
def _periodic_watcher_task(self, context):
|
||||
|
||||
now = timeutils.utcnow()
|
||||
wrs = db_api.watch_rule_get_all(context)
|
||||
for wr in wrs:
|
||||
logger.debug('_periodic_watcher_task %s' % wr.name)
|
||||
# has enough time progressed to run the rule
|
||||
dt_period = datetime.timedelta(seconds=int(wr.rule['Period']))
|
||||
if now < (wr.last_evaluated + dt_period):
|
||||
continue
|
||||
|
||||
# get dataset ordered by creation_at
|
||||
# - most recient first
|
||||
periods = int(wr.rule['EvaluationPeriods'])
|
||||
self.run_rule(context, wr, now)
|
||||
|
||||
# TODO fix this
|
||||
# initial assumption: all samples are in this period
|
||||
period = int(wr.rule['Period'])
|
||||
#wds = db_api.watch_data_get_all(context, wr.id)
|
||||
wds = wr.watch_data
|
||||
def run_rule(self, context, wr, now=timeutils.utcnow()):
|
||||
action_map = {'ALARM': 'AlarmActions',
|
||||
'NORMAL': 'OKActions',
|
||||
'NODATA': 'InsufficientDataActions'}
|
||||
|
||||
stat = wr.rule['Statistic']
|
||||
data = 0
|
||||
samples = 0
|
||||
for d in wds:
|
||||
if d.created_at < wr.last_evaluated:
|
||||
continue
|
||||
samples = samples + 1
|
||||
metric = 1
|
||||
data = samples
|
||||
if stat != 'SampleCount':
|
||||
metric = int(d.data[wr.rule['MetricName']]['Value'])
|
||||
data = self.do_data_calc(wr.rule, data, metric)
|
||||
watcher = watchrule.WatchRule(wr.rule, wr.watch_data,
|
||||
wr.last_evaluated, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
|
||||
if stat == 'Average' and samples > 0:
|
||||
data = data / samples
|
||||
if new_state != wr.state:
|
||||
wr.state = new_state
|
||||
wr.save()
|
||||
logger.warn('WATCH: stack:%s, watch_name:%s %s',
|
||||
wr.stack_name, wr.name, new_state)
|
||||
|
||||
alarming = self.do_data_cmp(wr.rule, data,
|
||||
int(wr.rule['Threshold']))
|
||||
logger.debug('%s: %d/%d => %d (current state:%s)' %
|
||||
(wr.rule['MetricName'],
|
||||
int(wr.rule['Threshold']),
|
||||
data, alarming, wr.state))
|
||||
if alarming and wr.state != 'ALARM':
|
||||
wr.state = 'ALARM'
|
||||
wr.save()
|
||||
logger.warn('ALARM> stack:%s, watch_name:%s',
|
||||
wr.stack_name, wr.name)
|
||||
#s = db_api.stack_get(None, wr.stack_name)
|
||||
#if s:
|
||||
# ps = parser.Stack(s.name,
|
||||
# s.raw_template.parsed_template.template,
|
||||
# s.id,
|
||||
# params)
|
||||
# for a in wr.rule['AlarmActions']:
|
||||
# ps.resources[a].alarm()
|
||||
if not action_map[new_state] in wr.rule:
|
||||
logger.info('no action for new state %s',
|
||||
new_state)
|
||||
else:
|
||||
s = db_api.stack_get(None, wr.stack_name)
|
||||
if s:
|
||||
ps = parser.Stack(context, s.name,
|
||||
s.raw_template.parsed_template.template,
|
||||
s.id)
|
||||
for a in wr.rule[action_map[new_state]]:
|
||||
ps.resources[a].alarm()
|
||||
|
||||
elif not alarming and wr.state == 'ALARM':
|
||||
wr.state = 'NORMAL'
|
||||
wr.save()
|
||||
logger.info('NORMAL> stack:%s, watch_name:%s',
|
||||
wr.stack_name, wr.name)
|
||||
|
||||
wr.last_evaluated = now
|
||||
wr.last_evaluated = now
|
||||
|
||||
def create_watch_data(self, context, watch_name, stats_data):
|
||||
'''
|
||||
This could be used by CloudWatch and WaitConditions
|
||||
and treat HA service events like any other CloudWatch.
|
||||
'''
|
||||
|
||||
wr = db_api.watch_rule_get(context, watch_name)
|
||||
if wr is None:
|
||||
logger.warn('NoSuch watch:%s' % (watch_name))
|
||||
return ['NoSuch Watch Rule', None]
|
||||
|
||||
if not wr.rule['MetricName'] in stats_data:
|
||||
logger.warn('new data has incorrect metric:%s' %
|
||||
(wr.rule['MetricName']))
|
||||
return ['MetricName %s missing' % wr.rule['MetricName'], None]
|
||||
|
||||
watch_data = {
|
||||
@ -476,5 +426,8 @@ class EngineManager(manager.Manager):
|
||||
'watch_rule_id': wr.id
|
||||
}
|
||||
wd = db_api.watch_data_create(context, watch_data)
|
||||
logger.debug('new watch:%s data:%s' % (watch_name, str(wd.data)))
|
||||
if wr.rule['Statistic'] == 'SampleCount':
|
||||
self.run_rule(context, wr)
|
||||
|
||||
return [None, wd.data]
|
||||
|
142
heat/engine/watchrule.py
Normal file
142
heat/engine/watchrule.py
Normal file
@ -0,0 +1,142 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from heat.openstack.common import timeutils
|
||||
|
||||
logger = logging.getLogger('heat.engine.watchrule')
|
||||
|
||||
|
||||
class WatchRule(object):
|
||||
ALARM = 'ALARM'
|
||||
NORMAL = 'NORMAL'
|
||||
NODATA = 'NODATA'
|
||||
|
||||
def __init__(self, rule, dataset, last_evaluated, now):
|
||||
self.rule = rule
|
||||
self.data = dataset
|
||||
self.last_evaluated = last_evaluated
|
||||
self.now = now
|
||||
self.timeperiod = datetime.timedelta(seconds=int(self.rule['Period']))
|
||||
|
||||
def do_data_cmp(self, data, threshold):
|
||||
op = self.rule['ComparisonOperator']
|
||||
if op == 'GreaterThanThreshold':
|
||||
return data > threshold
|
||||
elif op == 'GreaterThanOrEqualToThreshold':
|
||||
return data >= threshold
|
||||
elif op == 'LessThanThreshold':
|
||||
return data < threshold
|
||||
elif op == 'LessThanOrEqualToThreshold':
|
||||
return data <= threshold
|
||||
else:
|
||||
return False
|
||||
|
||||
def do_Maximum(self):
|
||||
data = 0
|
||||
have_data = False
|
||||
for d in self.data:
|
||||
if d.created_at < self.now - self.timeperiod:
|
||||
continue
|
||||
if not have_data:
|
||||
data = int(d.data[self.rule['MetricName']]['Value'])
|
||||
have_data = True
|
||||
if int(d.data[self.rule['MetricName']]['Value']) > data:
|
||||
data = int(d.data[self.rule['MetricName']]['Value'])
|
||||
|
||||
if not have_data:
|
||||
return self.NODATA
|
||||
|
||||
if self.do_data_cmp(data,
|
||||
int(self.rule['Threshold'])):
|
||||
return self.ALARM
|
||||
else:
|
||||
return self.NORMAL
|
||||
|
||||
def do_Minimum(self):
|
||||
data = 0
|
||||
have_data = False
|
||||
for d in self.data:
|
||||
if d.created_at < self.now - self.timeperiod:
|
||||
continue
|
||||
if not have_data:
|
||||
data = int(d.data[self.rule['MetricName']]['Value'])
|
||||
have_data = True
|
||||
elif int(d.data[self.rule['MetricName']]['Value']) < data:
|
||||
data = int(d.data[self.rule['MetricName']]['Value'])
|
||||
|
||||
if not have_data:
|
||||
return self.NODATA
|
||||
|
||||
if self.do_data_cmp(data,
|
||||
int(self.rule['Threshold'])):
|
||||
return self.ALARM
|
||||
else:
|
||||
return self.NORMAL
|
||||
|
||||
def do_SampleCount(self):
|
||||
'''
|
||||
count all samples within the specified period
|
||||
'''
|
||||
data = 0
|
||||
for d in self.data:
|
||||
if d.created_at < self.now - self.timeperiod:
|
||||
continue
|
||||
data = data + 1
|
||||
|
||||
if self.do_data_cmp(data,
|
||||
int(self.rule['Threshold'])):
|
||||
return self.ALARM
|
||||
else:
|
||||
return self.NORMAL
|
||||
|
||||
def do_Average(self):
|
||||
data = 0
|
||||
samples = 0
|
||||
for d in self.data:
|
||||
if d.created_at < self.now - self.timeperiod:
|
||||
continue
|
||||
samples = samples + 1
|
||||
data = data + int(d.data[self.rule['MetricName']]['Value'])
|
||||
|
||||
if samples == 0:
|
||||
return self.NODATA
|
||||
|
||||
data = data / samples
|
||||
if self.do_data_cmp(data,
|
||||
int(self.rule['Threshold'])):
|
||||
return self.ALARM
|
||||
else:
|
||||
return self.NORMAL
|
||||
|
||||
def do_Sum(self):
|
||||
data = 0
|
||||
for d in self.data:
|
||||
if d.created_at < self.now - self.timeperiod:
|
||||
logger.debug('ignoring %s' % str(d.data))
|
||||
continue
|
||||
data = data + int(d.data[self.rule['MetricName']]['Value'])
|
||||
|
||||
if self.do_data_cmp(data,
|
||||
int(self.rule['Threshold'])):
|
||||
return self.ALARM
|
||||
else:
|
||||
return self.NORMAL
|
||||
|
||||
def get_alarm_state(self):
|
||||
fn = getattr(self, 'do_%s' % self.rule['Statistic'])
|
||||
return fn()
|
177
heat/tests/test_watch.py
Normal file
177
heat/tests/test_watch.py
Normal file
@ -0,0 +1,177 @@
|
||||
import datetime
|
||||
import mox
|
||||
import nose
|
||||
from nose.plugins.attrib import attr
|
||||
from nose import with_setup
|
||||
import unittest
|
||||
from nose.exc import SkipTest
|
||||
import logging
|
||||
|
||||
from heat.openstack.common import timeutils
|
||||
try:
|
||||
from heat.engine import watchrule
|
||||
except:
|
||||
raise SkipTest("unable to import watchrule, skipping")
|
||||
|
||||
|
||||
logger = logging.getLogger('test_watch')
|
||||
|
||||
|
||||
class WatchData:
|
||||
def __init__(self, data, created_at):
|
||||
self.created_at = created_at
|
||||
self.data = {'test_metric': {'Value': data,
|
||||
'Unit': 'Count'}}
|
||||
|
||||
|
||||
class WatchRuleTest(unittest.TestCase):
|
||||
|
||||
@attr(tag=['unit', 'watchrule'])
|
||||
@attr(speed='fast')
|
||||
def test_minimum(self):
|
||||
rule = {
|
||||
'EvaluationPeriods': '1',
|
||||
'MetricName': 'test_metric',
|
||||
'Period': '300',
|
||||
'Statistic': 'Minimum',
|
||||
'ComparisonOperator': 'LessThanOrEqualToThreshold',
|
||||
'Threshold': '50'}
|
||||
|
||||
now = timeutils.utcnow()
|
||||
last = now - datetime.timedelta(seconds=320)
|
||||
data = [WatchData(77, now - datetime.timedelta(seconds=100))]
|
||||
data.append(WatchData(53, now - datetime.timedelta(seconds=150)))
|
||||
|
||||
# all > 50 -> NORMAL
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'NORMAL')
|
||||
|
||||
data.append(WatchData(25, now - datetime.timedelta(seconds=250)))
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'ALARM')
|
||||
|
||||
@attr(tag=['unit', 'watchrule'])
|
||||
@attr(speed='fast')
|
||||
def test_maximum(self):
|
||||
rule = {
|
||||
'EvaluationPeriods': '1',
|
||||
'MetricName': 'test_metric',
|
||||
'Period': '300',
|
||||
'Statistic': 'Maximum',
|
||||
'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
|
||||
'Threshold': '30'}
|
||||
|
||||
now = timeutils.utcnow()
|
||||
last = now - datetime.timedelta(seconds=320)
|
||||
data = [WatchData(7, now - datetime.timedelta(seconds=100))]
|
||||
data.append(WatchData(23, now - datetime.timedelta(seconds=150)))
|
||||
|
||||
# all < 30 -> NORMAL
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'NORMAL')
|
||||
|
||||
data.append(WatchData(35, now - datetime.timedelta(seconds=150)))
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'ALARM')
|
||||
|
||||
@attr(tag=['unit', 'watchrule'])
|
||||
@attr(speed='fast')
|
||||
def test_samplecount(self):
|
||||
|
||||
rule = {
|
||||
'EvaluationPeriods': '1',
|
||||
'MetricName': 'test_metric',
|
||||
'Period': '300',
|
||||
'Statistic': 'SampleCount',
|
||||
'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
|
||||
'Threshold': '3'}
|
||||
|
||||
now = timeutils.utcnow()
|
||||
last = now - datetime.timedelta(seconds=320)
|
||||
data = [WatchData(1, now - datetime.timedelta(seconds=100))]
|
||||
data.append(WatchData(1, now - datetime.timedelta(seconds=150)))
|
||||
|
||||
# only 2 samples -> NORMAL
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'NORMAL')
|
||||
|
||||
# only 3 samples -> ALARM
|
||||
data.append(WatchData(1, now - datetime.timedelta(seconds=200)))
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'ALARM')
|
||||
|
||||
# only 3 samples (one old) -> NORMAL
|
||||
data.pop(0)
|
||||
data.append(WatchData(1, now - datetime.timedelta(seconds=400)))
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'NORMAL')
|
||||
|
||||
@attr(tag=['unit', 'watchrule'])
|
||||
@attr(speed='fast')
|
||||
def test_sum(self):
|
||||
rule = {
|
||||
'EvaluationPeriods': '1',
|
||||
'MetricName': 'test_metric',
|
||||
'Period': '300',
|
||||
'Statistic': 'Sum',
|
||||
'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
|
||||
'Threshold': '100'}
|
||||
|
||||
now = timeutils.utcnow()
|
||||
last = now - datetime.timedelta(seconds=320)
|
||||
data = [WatchData(17, now - datetime.timedelta(seconds=100))]
|
||||
data.append(WatchData(23, now - datetime.timedelta(seconds=150)))
|
||||
|
||||
# all < 40 -> NORMAL
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'NORMAL')
|
||||
|
||||
# sum > 100 -> ALARM
|
||||
data.append(WatchData(85, now - datetime.timedelta(seconds=150)))
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'ALARM')
|
||||
|
||||
@attr(tag=['unit', 'watchrule'])
|
||||
@attr(speed='fast')
|
||||
def test_ave(self):
|
||||
rule = {
|
||||
'EvaluationPeriods': '1',
|
||||
'MetricName': 'test_metric',
|
||||
'Period': '300',
|
||||
'Statistic': 'Average',
|
||||
'ComparisonOperator': 'GreaterThanThreshold',
|
||||
'Threshold': '100'}
|
||||
|
||||
now = timeutils.utcnow()
|
||||
last = now - datetime.timedelta(seconds=320)
|
||||
data = [WatchData(117, now - datetime.timedelta(seconds=100))]
|
||||
data.append(WatchData(23, now - datetime.timedelta(seconds=150)))
|
||||
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'NORMAL')
|
||||
|
||||
data.append(WatchData(195, now - datetime.timedelta(seconds=250)))
|
||||
watcher = watchrule.WatchRule(rule, data, last, now)
|
||||
new_state = watcher.get_alarm_state()
|
||||
logger.info(new_state)
|
||||
assert(new_state == 'ALARM')
|
Loading…
x
Reference in New Issue
Block a user