New grammar for SLA specification

The new grammar allows to specify record selector and condition
in the same string. Example:
 '[type == "agent"] >> (stats.bandwidth.mean > 800)' -- pick records
with attribute 'type' equal to 'agent' and check condition against every.

Change-Id: Icdf92400991029a1f06a6487679faad8acea7afe
This commit is contained in:
Ilya Shakhat 2015-04-17 19:46:11 +03:00
parent 0b1e10c125
commit 523fbdb645
9 changed files with 293 additions and 97 deletions

View File

@ -12,6 +12,9 @@ execution:
title: Iperf TCP test title: Iperf TCP test
class: iperf_graph class: iperf_graph
time: 60 time: 60
sla:
- "[type == 'agent'] >> (stats.bandwidth.min > 100)"
- "[type == 'agent'] >> (stats.bandwidth.mean > 200)"
- -
title: TCP download title: TCP download
class: netperf_wrapper class: netperf_wrapper

View File

@ -13,7 +13,7 @@ execution:
class: iperf_graph class: iperf_graph
time: 60 time: 60
sla: sla:
- bandwidth.mean > 100 - "[type == 'agent'] >> (stats.bandwidth.mean > 100)"
- -
title: Iperf UDP 5 threads title: Iperf UDP 5 threads
class: iperf class: iperf

View File

@ -25,6 +25,7 @@ import yaml
from shaker.engine import aggregators from shaker.engine import aggregators
from shaker.engine import config from shaker.engine import config
from shaker.engine import sla
from shaker.engine import utils from shaker.engine import utils
@ -55,70 +56,58 @@ def calculate_stats(records, tests):
if summary: if summary:
summary.update(dict(scenario=scenario, test=test, summary.update(dict(scenario=scenario, test=test,
concurrency=concurrency, concurrency=concurrency,
type='agg_concurrency')) type='concurrency'))
aggregates.append(summary) aggregates.append(summary)
concurrency_aggregates.append(summary) concurrency_aggregates.append(summary)
per_test_summary = aggregator.test_summary(concurrency_aggregates) per_test_summary = aggregator.test_summary(concurrency_aggregates)
if per_test_summary: if per_test_summary:
per_test_summary.update(dict(scenario=scenario, test=test, per_test_summary.update(dict(scenario=scenario, test=test,
type='agg_test')) type='test'))
aggregates.append(per_test_summary) aggregates.append(per_test_summary)
return aggregates return aggregates
SLARecord = collections.namedtuple('SLARecord',
['sla', 'status', 'location', 'stats'])
def _verify_stats_against_sla(sla, record, location):
res = []
for term in sla:
status = utils.eval_expr(term, record['stats'])
sla_record = SLARecord(sla=term, status=status,
location=location, stats=record['stats'])
res.append(sla_record)
LOG.debug('SLA: %s', sla_record)
return res
def verify_sla(records, tests): def verify_sla(records, tests):
sla_results = [] record_map = collections.defaultdict(list) # test -> [record]
# test -> [sla] for r in records:
sla_map = dict((test_id, test['sla']) if 'sla' in tests[r['test']]:
for test_id, test in tests.items() if 'sla' in test) record_map[r['test']].append(r)
for record in records: sla_records = []
if (record['test'] in sla_map) and ('stats' in record): for test_id, records_per_test in record_map.items():
sla = sla_map[record['test']] for sla_expr in tests[test_id]['sla']:
path = [str(record[key]) sla_records += sla.eval_expr(sla_expr, records_per_test)
for key in ['test', 'concurrency', 'node', 'agent_id']
if key in record] return sla_records
info = _verify_stats_against_sla(sla, record, '.'.join(path))
sla_results += info
record['sla_info'] = info
return sla_results
def save_to_subunit(sla_res, subunit_filename): def _get_location(record):
return '.'.join([str(record.get(s))
for s in ['scenario', 'test', 'concurrency',
'node', 'agent_id']])
def save_to_subunit(sla_records, subunit_filename):
LOG.debug('Writing subunit stream to: %s', subunit_filename) LOG.debug('Writing subunit stream to: %s', subunit_filename)
fd = None fd = None
try: try:
fd = open(subunit_filename, 'w') fd = open(subunit_filename, 'w')
output = subunit_v2.StreamResultToBytes(fd) output = subunit_v2.StreamResultToBytes(fd)
for item in sla_res: for item in sla_records:
output.startTestRun() output.startTestRun()
test_id = item.location + ':' + item.sla test_id = _get_location(item.record) + ':' + item.expression
if not item.status: if not item.state:
output.status(test_id=test_id, file_name='results', output.status(test_id=test_id, file_name='results',
mime_type='text/plain; charset="utf8"', eof=True, mime_type='text/plain; charset="utf8"', eof=True,
file_bytes=yaml.safe_dump( file_bytes=yaml.safe_dump(
item.stats, default_flow_style=False)) item.record, default_flow_style=False))
output.status(test_id=test_id, output.status(test_id=test_id,
test_status='success' if item.status else 'fail') test_status='success' if item.state else 'fail')
output.stopTestRun() output.stopTestRun()
LOG.info('Subunit stream saved to: %s', subunit_filename) LOG.info('Subunit stream saved to: %s', subunit_filename)
@ -135,10 +124,10 @@ def generate_report(data, report_template, report_filename, subunit_filename):
data['records'] += calculate_stats(data['records'], data['tests']) data['records'] += calculate_stats(data['records'], data['tests'])
sla_res = verify_sla(data['records'], data['tests']) sla_records = verify_sla(data['records'], data['tests'])
if subunit_filename: if subunit_filename:
save_to_subunit(sla_res, subunit_filename) save_to_subunit(sla_records, subunit_filename)
# add more filters to jinja # add more filters to jinja
jinja_env = jinja2.Environment(variable_start_string='[[[', jinja_env = jinja2.Environment(variable_start_string='[[[',

View File

@ -92,7 +92,7 @@ def execute(quorum, execution, agents):
concurrency=len(selected_agents), concurrency=len(selected_agents),
test=test_title, test=test_title,
executor=test.get('class'), executor=test.get('class'),
type='raw', type='agent',
)) ))
records.append(data) records.append(data)

137
shaker/engine/sla.py Normal file
View File

@ -0,0 +1,137 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import collections
import operator as op
import re
SLAItem = collections.namedtuple('SLAItem', ['record', 'state', 'expression'])
# supported operators
operators = {ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul,
ast.Div: op.truediv, ast.Pow: op.pow, ast.BitXor: op.xor,
ast.USub: op.neg, ast.Lt: op.lt, ast.Gt: op.gt, ast.LtE: op.le,
ast.GtE: op.ge, ast.Eq: op.eq, ast.And: op.and_, ast.Or: op.or_,
ast.Not: op.not_}
def eval_expr(expr, ctx=None):
"""Usage examples:
>>> eval_expr('2^6')
4
>>> eval_expr('2**6')
64
>>> eval_expr('1 + 2*3**(4^5) / (6 + -7)')
-5.0
>>> eval_expr('11 > a > 5', {'a': 7})
True
>>> eval_expr('2 + a.b', {'a': {'b': 2.2}})
4.2
"""
ctx = ctx or {}
return _eval(ast.parse(expr, mode='eval').body, ctx)
def _eval(node, ctx):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.Name):
return ctx.get(node.id)
elif isinstance(node, ast.Str):
return node.s
elif isinstance(node, ast.BinOp):
if isinstance(node.op, ast.RShift):
# left -- array, right -- condition
filtered = _eval(node.left, ctx)
result = []
for record in filtered:
state = _eval(node.right, record)
result.append(SLAItem(record=record, state=state,
expression=dump_ast_node(node.right)))
return result
elif isinstance(node.op, ast.BitAnd):
return re.match(_eval(node.right, ctx),
_eval(node.left, ctx)) is not None
else:
return operators[type(node.op)](_eval(node.left, ctx),
_eval(node.right, ctx))
elif isinstance(node, ast.UnaryOp):
return operators[type(node.op)](_eval(node.operand, ctx))
elif isinstance(node, ast.Compare):
x = _eval(node.left, ctx)
r = True
for i in range(len(node.ops)):
y = _eval(node.comparators[i], ctx)
r &= operators[type(node.ops[i])](x, y)
x = y
return r
elif isinstance(node, ast.BoolOp):
r = _eval(node.values[0], ctx)
for i in range(1, len(node.values)):
r = operators[type(node.op)](r, _eval(node.values[i], ctx))
return r
elif isinstance(node, ast.Attribute):
return _eval(node.value, ctx).get(node.attr)
elif isinstance(node, ast.List):
records = ctx
filtered = []
for record in records:
for el in node.elts:
if _eval(el, record):
filtered.append(record)
return filtered
else:
raise TypeError(node)
def dump_ast_node(node):
_operators = {ast.Add: '+', ast.Sub: '-', ast.Mult: '*',
ast.Div: '/', ast.Pow: '**', ast.BitXor: '^',
ast.BitAnd: '&', ast.BitOr: '|', ast.USub: '-',
ast.Lt: '<', ast.Gt: '>', ast.LtE: '<=', ast.GtE: '>=',
ast.Eq: '==', ast.And: 'and', ast.Or: 'or', ast.Not: 'not'}
def _format(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.Name):
return node.id
elif isinstance(node, ast.Str):
return '\'node.s\''
elif isinstance(node, ast.BinOp):
return '%s %s %s' % (_format(node.left), _operators[type(node.op)],
_format(node.right))
elif isinstance(node, ast.UnaryOp):
return '%s %s' % (_operators[type(node.op)], _format(node.operand))
elif isinstance(node, ast.Compare):
r = '%s' % _format(node.left)
for i in range(len(node.ops)):
y = _format(node.comparators[i])
r = '%s %s %s' % (r, _operators[type(node.ops[i])], y)
return r
elif isinstance(node, ast.BoolOp):
return (' %s ' % _operators[type(node.op)]).join(
_format(v) for v in node.values)
elif isinstance(node, ast.Attribute):
return '%s.%s' % (_format(node.value), node.attr)
elif isinstance(node, ast.Expression):
return '(%s)' % _format(node.body)
else:
raise TypeError(node)
return _format(node)

View File

@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ast
import operator as op
import logging as std_logging import logging as std_logging
import os import os
import random import random
@ -154,52 +151,3 @@ def flatten_dict(d, prefix='', sep='.'):
else: else:
res.append((path, v)) res.append((path, v))
return res return res
# supported operators
operators = {ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul,
ast.Div: op.truediv, ast.Pow: op.pow, ast.BitXor: op.xor,
ast.USub: op.neg, ast.Lt: op.lt, ast.Gt: op.gt, ast.LtE: op.le,
ast.GtE: op.ge, ast.Eq: op.eq}
def eval_expr(expr, ctx=None):
"""Usage examples:
>>> eval_expr('2^6')
4
>>> eval_expr('2**6')
64
>>> eval_expr('1 + 2*3**(4^5) / (6 + -7)')
-5.0
>>> eval_expr('11 > a > 5', {'a': 7})
True
>>> eval_expr('2 + a.b', {'a': {'b': 2.2}})
4.2
"""
ctx = ctx or {}
return _eval(ast.parse(expr, mode='eval').body, ctx)
def _eval(node, ctx):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.Name):
return ctx.get(node.id)
elif isinstance(node, ast.BinOp):
return operators[type(node.op)](_eval(node.left, ctx),
_eval(node.right, ctx))
elif isinstance(node, ast.UnaryOp):
return operators[type(node.op)](_eval(node.operand, ctx))
elif isinstance(node, ast.Compare):
x = _eval(node.left, ctx)
r = True
for i in range(len(node.ops)):
y = _eval(node.comparators[i], ctx)
r &= operators[type(node.ops[i])](x, y)
x = y
return r
elif isinstance(node, ast.Attribute):
return _eval(node.value, ctx).get(node.attr)
else:
raise TypeError(node)

59
tests/test_report.py Normal file
View File

@ -0,0 +1,59 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from shaker.engine import report
from shaker.engine import sla
class TestReport(testtools.TestCase):
def test_verify_sla(self):
records = [{'type': 'agent', 'test': 'iperf_tcp',
'stats': {'bandwidth': {'mean': 700, 'min': 400}}},
{'type': 'agent', 'test': 'iperf_udp',
'stats': {'bandwidth': {'mean': 1000, 'min': 800}}},
{'type': 'agent', 'test': 'iperf_tcp',
'stats': {'bandwidth': {'mean': 850, 'min': 600}}}]
tests = {
'iperf_tcp': {
'sla': [
'[type == "agent"] >> (stats.bandwidth.mean > 800)',
'[type == "agent"] >> (stats.bandwidth.min > 500)',
],
},
'iperf_udp': {
'sla': [
'[type == "agent"] >> (stats.bandwidth.mean > 900)',
],
}
}
sla_records = report.verify_sla(records, tests)
self.assertIn(sla.SLAItem(records[0], False,
'stats.bandwidth.mean > 800'), sla_records)
self.assertIn(sla.SLAItem(records[0], False,
'stats.bandwidth.min > 500'), sla_records)
self.assertIn(sla.SLAItem(records[1], True,
'stats.bandwidth.mean > 900'), sla_records)
self.assertIn(sla.SLAItem(records[2], True,
'stats.bandwidth.mean > 800'), sla_records)
self.assertIn(sla.SLAItem(records[2], True,
'stats.bandwidth.min > 500'), sla_records)

65
tests/test_sla.py Normal file
View File

@ -0,0 +1,65 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import testtools
from shaker.engine import sla
class TestSla(testtools.TestCase):
def test_eval(self):
self.assertEqual(2 ** 6, sla.eval_expr('2**6'))
self.assertEqual(True, sla.eval_expr('11 > a > 5', {'a': 7}))
self.assertEqual(42, sla.eval_expr('2 + a.b', {'a': {'b': 40}}))
self.assertEqual(True, sla.eval_expr('11 > 7 and 5 < 6'))
self.assertEqual(False, sla.eval_expr('(not 11 > 7) or (not 5 < 6)'))
def test_eval_regex(self):
self.assertEqual(True, sla.eval_expr('"some text" & "\w+\s+\w+"'))
self.assertEqual(False, sla.eval_expr('"some text" & "\d+"'))
def test_eval_sla(self):
records = [{'type': 'agent', 'test': 'iperf_tcp',
'stats': {'bandwidth': {'mean': 700}}},
{'type': 'agent', 'test': 'iperf_udp',
'stats': {'bandwidth': {'mean': 1000}}},
{'type': 'node', 'test': 'iperf_tcp',
'stats': {'bandwidth': {'mean': 850}}}]
expr = 'stats.bandwidth.mean > 800'
sla_records = sla.eval_expr('[type == "agent"] >> (%s)' % expr,
records)
self.assertEqual([
sla.SLAItem(record=records[0], state=False, expression=expr),
sla.SLAItem(record=records[1], state=True, expression=expr)],
sla_records)
expr = 'stats.bandwidth.mean > 900'
sla_records = sla.eval_expr('[test == "iperf_udp", type == "node"] >> '
'(%s)' % expr, records)
self.assertEqual([
sla.SLAItem(record=records[1], state=True, expression=expr),
sla.SLAItem(record=records[2], state=False, expression=expr)],
sla_records)
def test_dump_ast_node(self):
self.assertEqual('(stats.bandwidth.mean > 900)', sla.dump_ast_node(
ast.parse('stats.bandwidth.mean > 900', mode='eval')))
expr = ('(stats.bandwidth.mean > 900 and not stats.ping.max < 0.5 and '
'stats.ping.mean < 0.35)')
self.assertEqual(expr,
sla.dump_ast_node(ast.parse(expr, mode='eval')))

View File

@ -37,8 +37,3 @@ class TestUtils(testtools.TestCase):
self.assertEqual( self.assertEqual(
{'a': 1, 'b.c': 2, 'b.d': 3}, {'a': 1, 'b.c': 2, 'b.d': 3},
dict(utils.flatten_dict({'a': 1, 'b': {'c': 2, 'd': 3}}))) dict(utils.flatten_dict({'a': 1, 'b': {'c': 2, 'd': 3}})))
def test_eval(self):
self.assertEqual(2 ** 6, utils.eval_expr('2**6'))
self.assertEqual(True, utils.eval_expr('11 > a > 5', {'a': 7}))
self.assertEqual(42, utils.eval_expr('2 + a.b', {'a': {'b': 40}}))