Browse Source

Store time-series data into Mongo

Ilya Shakhat 3 years ago
parent
commit
2c4e5167ee

+ 2
- 5
performa/engine/main.py View File

@@ -53,12 +53,9 @@ def main():
53 53
         tag = utils.random_string()
54 54
         LOG.info('Using auto-generated tag "%s"', tag)
55 55
 
56
-    records = player.play_scenario(scenario, tag)
56
+    records, series = player.play_scenario(scenario, tag)
57 57
 
58
-    if records:
59
-        storage.store_data(records, cfg.CONF.mongo_url, cfg.CONF.mongo_db)
60
-    else:
61
-        LOG.warning('Execution generated no records')
58
+    storage.store_data(cfg.CONF.mongo_url, cfg.CONF.mongo_db, records, series)
62 59
 
63 60
     report.generate_report(scenario, base_dir, cfg.CONF.mongo_url,
64 61
                            cfg.CONF.mongo_db, cfg.CONF.book, tag)

+ 23
- 19
performa/engine/player.py View File

@@ -48,6 +48,7 @@ def play_setup(setup):
48 48
 
49 49
 def play_execution(execution_playbook):
50 50
     records = []
51
+    series = []
51 52
 
52 53
     for play in execution_playbook:
53 54
         matrix = play.get('matrix')
@@ -62,35 +63,37 @@ def play_execution(execution_playbook):
62 63
 
63 64
             for command_result in command_results:
64 65
                 if command_result.get('status') == 'OK':
65
-                    record = dict(id=utils.make_id(),
66
+                    payload = command_result['payload']
67
+
68
+                    common = dict(id=utils.make_id(),
66 69
                                   host=command_result['host'],
67 70
                                   status=command_result['status'],
68 71
                                   task=command_result['task'])
69
-                    payload = command_result['payload']
70
-                    record.update(payload['invocation']['module_args'])
71
-                    record.update(payload)
72
+                    common.update(payload['invocation']['module_args'])
72 73
 
73
-                    # keep flat values only
74
-                    for k, v in record.items():
75
-                        if isinstance(v, list) or isinstance(v, dict):
76
-                            del record[k]
74
+                    if 'records' in payload:
75
+                        for rec in payload['records']:
76
+                            rec.update(common)
77
+                            records.append(rec)
78
+                            LOG.debug('New record: %s', rec)
77 79
 
78
-                    if 'stdout' in record:
79
-                        del record['stdout']
80
+                    if 'series' in payload:
81
+                        for rec in payload['series']:
82
+                            rec.update(common)
83
+                            series.append(rec)
84
+                            LOG.debug('New time series: %s', rec)
80 85
 
81
-                    LOG.debug('Record: %s', record)
82
-                    records.append(record)
86
+    return records, series
83 87
 
84
-    return records
85 88
 
86
-
87
-def tag_records(records, tag):
89
+def add_tag(records, tag):
88 90
     for r in records:
89 91
         r['tag'] = tag
90 92
 
91 93
 
92 94
 def play_scenario(scenario, tag):
93
-    records = {}
95
+    records = []
96
+    series = []
94 97
 
95 98
     if 'setup' in scenario:
96 99
         play_setup(scenario['setup'])
@@ -98,7 +101,8 @@ def play_scenario(scenario, tag):
98 101
     if 'execution' in scenario:
99 102
         execution = scenario['execution']
100 103
 
101
-        records = play_execution(execution)
102
-        tag_records(records, tag)
104
+        records, series = play_execution(execution)
105
+        add_tag(records, tag)
106
+        add_tag(series, tag)
103 107
 
104
-    return records
108
+    return records, series

+ 8
- 3
performa/engine/storage.py View File

@@ -21,12 +21,17 @@ from performa.engine import utils
21 21
 LOG = logging.getLogger(__name__)
22 22
 
23 23
 
24
-def store_data(records, mongo_url, mongo_db):
24
+def store_data(mongo_url, mongo_db, records, series):
25 25
     LOG.info('Store data to Mongo: %s', mongo_url)
26 26
 
27 27
     connection_params = utils.parse_url(mongo_url)
28 28
     mongo_client = pymongo.MongoClient(**connection_params)
29 29
     db = mongo_client.get_database(mongo_db)
30 30
 
31
-    records_collection = db.get_collection('records')
32
-    records_collection.insert_many(records)
31
+    if records:
32
+        records_collection = db.get_collection('records')
33
+        records_collection.insert_many(records)
34
+
35
+    if series:
36
+        series_collection = db.get_collection('series')
37
+        series_collection.insert_many(series)

+ 3
- 1
performa/modules/atop.py View File

@@ -1,8 +1,10 @@
1 1
 #!/usr/bin/python
2 2
 
3
+import os
3 4
 import re
5
+import tempfile
4 6
 
5
-ATOP_FILE_NAME = '/tmp/performa.atop'
7
+ATOP_FILE_NAME = os.path.join(tempfile.gettempdir(), 'performa.atop')
6 8
 UNIQUE_NAME = 'performa_atop'
7 9
 
8 10
 PREFIX_PATTERN = (

+ 8
- 6
performa/modules/sysbench_oltp.py View File

@@ -68,17 +68,19 @@ def main():
68 68
            'run'
69 69
            ) % module.params
70 70
 
71
+    start = int(time.time())
71 72
     rc, stdout, stderr = module.run_command(cmd)
72
-
73
-    result = dict(changed=True, rc=rc, stdout=stdout, stderr=stderr, cmd=cmd)
73
+    end = int(time.time())
74 74
 
75 75
     try:
76
-        result.update(parse_sysbench_oltp(stdout))
76
+        parsed = parse_sysbench_oltp(stdout)
77
+        parsed['start'] = start
78
+        parsed['end'] = end
79
+
80
+        result = dict(records=[parsed])
77 81
         module.exit_json(**result)
78 82
     except Exception as e:
79
-        result['exception'] = e
80
-
81
-    module.fail_json(**result)
83
+        module.fail_json(msg=e, rc=rc, stderr=stderr, stdout=stdout)
82 84
 
83 85
 
84 86
 from ansible.module_utils.basic import *  # noqa

Loading…
Cancel
Save