Filter atop series inside the module

Introduce ability to filter series inside atop module.
The feature is configured via parameter 'filter', which
is a dictinary where keys as param names and values play role
of filter.

Change-Id: I553558a63879a3e6f66522a040b74b91e4884800
This commit is contained in:
Ilya Shakhat 2016-03-31 19:36:37 +03:00
parent 03b81b9f4c
commit 6e684cd841
5 changed files with 88 additions and 17 deletions

View File

@ -87,6 +87,8 @@ def play_execution(runner, execution_playbook):
rec.update(common) rec.update(common)
series.append(rec) series.append(rec)
LOG.debug('New time series: %s', rec) LOG.debug('New time series: %s', rec)
else:
LOG.warning('Play failed: %s', command_result)
return records, series return records, series

View File

@ -1,5 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
import functools
import os import os
import re import re
import tempfile import tempfile
@ -145,10 +146,7 @@ def normalize_point(point):
return point return point
def parse_output(raw, filter_labels): def parse_output(raw):
filter_labels = set(filter_labels)
series = []
active = False active = False
for line in raw.split('\n'): for line in raw.split('\n'):
if line == 'SEP': if line == 'SEP':
@ -162,11 +160,44 @@ def parse_output(raw, filter_labels):
m = re.match(pattern, line) m = re.match(pattern, line)
if m: if m:
point = m.groupdict() point = m.groupdict()
if point['label'] in filter_labels: yield normalize_point(point)
series.append(normalize_point(point))
break break
return series
def make_filter_funcs(filters):
def _in_list(point, name, lst):
return point.get(name) in lst
def _match(point, name, patt):
return re.match(patt, point.get(name, ''))
funcs = []
for name, values in filters.items():
fn = None
if isinstance(values, list):
fn = functools.partial(_in_list, name=name, lst=set(values))
elif isinstance(values, str):
fn = functools.partial(_match, name=name, patt=re.compile(values))
if fn:
funcs.append(fn)
return funcs
def run_filter_funcs(points, funcs):
for point in points:
accepted = True
for fn in funcs:
if not fn(point):
accepted = False
break
if accepted:
yield point
def parse(raw, filters):
funcs = make_filter_funcs(filters)
return list(run_filter_funcs(parse_output(raw), funcs))
def start(module): def start(module):
@ -206,13 +237,17 @@ def stop(module):
# grab data # grab data
labels = module.params['labels'] or ALL_LABELS labels = module.params['labels'] or ALL_LABELS
ft = module.params.get('filter')
if ft and 'label' in ft:
labels = ft['label']
cmd = ('atop -r %(file)s -P %(labels)s' % cmd = ('atop -r %(file)s -P %(labels)s' %
dict(file=ATOP_FILE_NAME, labels=','.join(labels))) dict(file=ATOP_FILE_NAME, labels=','.join(labels)))
rc, stdout, stderr = module.run_command(cmd) rc, stdout, stderr = module.run_command(cmd)
try: try:
series = parse_output(stdout, labels) series = parse(stdout, module.params.get('filter', {}))
module.exit_json(series=series) module.exit_json(series=series)
except Exception as e: except Exception as e:
module.fail_json(msg=str(e), stderr=stderr, rc=rc) module.fail_json(msg=str(e), stderr=stderr, rc=rc)
@ -224,6 +259,7 @@ def main():
command=dict(required=True, choices=['start', 'stop']), command=dict(required=True, choices=['start', 'stop']),
interval=dict(type='int', default=1), interval=dict(type='int', default=1),
labels=dict(type='list'), labels=dict(type='list'),
filter=dict(type='dict', default={}),
)) ))
command = module.params['command'] command = module.params['command']

View File

@ -35,7 +35,7 @@ execution:
threads: [ 5, 10, 15, 20, 30, 40, 50 ] threads: [ 5, 10, 15, 20, 30, 40, 50 ]
tasks: tasks:
- sysbench_oltp: - sysbench_oltp:
duration: 60 duration: 10
mysql_host: {{ mysql_endpoint }} mysql_host: {{ mysql_endpoint }}
mysql_port: {{ mysql_port }} mysql_port: {{ mysql_port }}
mysql_db: sbtest mysql_db: sbtest
@ -44,7 +44,9 @@ execution:
tasks: tasks:
- atop: - atop:
command: stop command: stop
labels: [ PRC ] filter:
name: mysqld
label: [ PRC ]
aggregation: aggregation:
- -

View File

@ -87,7 +87,9 @@ execution:
tasks: tasks:
- atop: - atop:
command: stop command: stop
labels: [ PRC ] filter:
name: beam.*
label: [ PRC ]
aggregation: aggregation:
- -

View File

@ -36,7 +36,8 @@ class TestAtop(testtools.TestCase):
'sys': 0.04, 'ticks_per_second': 100, 'time': '10:01:05', 'sys': 0.04, 'ticks_per_second': 100, 'time': '10:01:05',
'timestamp': 1456480865, 'user': 0.04, 'wait': 0.0}] 'timestamp': 1456480865, 'user': 0.04, 'wait': 0.0}]
self.assertEqual(expected, atop.parse_output(_read_sample(), ['CPU'])) self.assertEqual(expected,
atop.parse(_read_sample(), dict(label=['CPU'])))
def test_parse_cpu(self): def test_parse_cpu(self):
needle = {'cpu_id': 2, 'date': '2016/02/26', 'guest': 0.0, needle = {'cpu_id': 2, 'date': '2016/02/26', 'guest': 0.0,
@ -45,7 +46,8 @@ class TestAtop(testtools.TestCase):
'sys': 0.03, 'ticks_per_second': 100, 'time': '10:01:05', 'sys': 0.03, 'ticks_per_second': 100, 'time': '10:01:05',
'timestamp': 1456480865, 'user': 0.03, 'wait': 0.0} 'timestamp': 1456480865, 'user': 0.03, 'wait': 0.0}
self.assertIn(needle, atop.parse_output(_read_sample(), ['cpu'])) self.assertIn(needle,
atop.parse(_read_sample(), dict(label=['cpu'])))
def test_parse_mem(self): def test_parse_mem(self):
expected = [ expected = [
@ -58,7 +60,8 @@ class TestAtop(testtools.TestCase):
'label': 'MEM', 'page_size': 4096, 'phys': 8373075968, 'label': 'MEM', 'page_size': 4096, 'phys': 8373075968,
'slab': 298115072, 'time': '10:01:05', 'timestamp': 1456480865}] 'slab': 298115072, 'time': '10:01:05', 'timestamp': 1456480865}]
self.assertEqual(expected, atop.parse_output(_read_sample(), ['MEM'])) self.assertEqual(expected,
atop.parse(_read_sample(), dict(label=['MEM'])))
def test_parse_net(self): def test_parse_net(self):
needle = {'date': '2016/02/26', 'host': 'host', 'interval': 1, needle = {'date': '2016/02/26', 'host': 'host', 'interval': 1,
@ -66,7 +69,8 @@ class TestAtop(testtools.TestCase):
'label': 'NET', 'tcp_rx': 0, 'tcp_tx': 0, 'time': '10:01:04', 'label': 'NET', 'tcp_rx': 0, 'tcp_tx': 0, 'time': '10:01:04',
'timestamp': 1456480864, 'udp_rx': 0, 'udp_tx': 0} 'timestamp': 1456480864, 'udp_rx': 0, 'udp_tx': 0}
self.assertIn(needle, atop.parse_output(_read_sample(), ['NET'])) self.assertIn(needle,
atop.parse(_read_sample(), dict(label=['NET'])))
def test_parse_prc(self): def test_parse_prc(self):
needle = {'current_cpu': 2, 'date': '2016/02/26', 'host': 'host', needle = {'current_cpu': 2, 'date': '2016/02/26', 'host': 'host',
@ -76,7 +80,8 @@ class TestAtop(testtools.TestCase):
'sys': 0.02, 'ticks_per_second': 100, 'time': '10:01:04', 'sys': 0.02, 'ticks_per_second': 100, 'time': '10:01:04',
'timestamp': 1456480864, 'user': 0.01} 'timestamp': 1456480864, 'user': 0.01}
self.assertIn(needle, atop.parse_output(_read_sample(), ['PRC'])) self.assertIn(needle,
atop.parse(_read_sample(), dict(label=['PRC'])))
def test_parse_prm(self): def test_parse_prm(self):
needle = {'date': '2016/02/26', 'host': 'host', 'interval': 1, needle = {'date': '2016/02/26', 'host': 'host', 'interval': 1,
@ -87,4 +92,28 @@ class TestAtop(testtools.TestCase):
'timestamp': 1456480865, 'virtual': 17412096, 'timestamp': 1456480865, 'virtual': 17412096,
'virtual_growth': 0} 'virtual_growth': 0}
self.assertIn(needle, atop.parse_output(_read_sample(), ['PRM'])) self.assertIn(needle,
atop.parse(_read_sample(), dict(label=['PRM'])))
def test_parse_match_name_regex(self):
expected = [{'current_cpu': 2, 'date': '2016/02/26', 'host': 'host',
'interval': 1, 'label': 'PRC', 'name': 'dstat', 'nice': 0,
'pid': 11014, 'priority': 120, 'realtime_priority': 0,
'scheduling_policy': 0, 'sleep_avg': 0, 'state': 'S',
'sys': 0.02, 'ticks_per_second': 100, 'time': '10:01:04',
'timestamp': 1456480864, 'user': 0.01},
{'current_cpu': 2, 'date': '2016/02/26', 'host': 'host',
'interval': 1, 'label': 'PRC', 'name': 'dstat', 'nice': 0,
'pid': 11014, 'priority': 120, 'realtime_priority': 0,
'scheduling_policy': 0, 'sleep_avg': 0, 'state': 'S',
'sys': 0.0, 'ticks_per_second': 100, 'time': '10:01:05',
'timestamp': 1456480865, 'user': 0.02}]
filter = {
'name': 'dstat',
'label': ['PRC'],
}
self.assertEqual(expected, atop.parse(_read_sample(), filter))
def test_parse_no_filter(self):
self.assertEqual(43, len(atop.parse(_read_sample(), {})))