Browse Source

Implement aggregation stage

changes/96/298296/1
Ilya Shakhat 3 years ago
parent
commit
abab0404cd

+ 70
- 0
performa/engine/aggregator.py View File

@@ -0,0 +1,70 @@
1
+# Copyright (c) 2016 OpenStack Foundation
2
+#
3
+# Licensed under the Apache License, Version 2.0 (the "License");
4
+# you may not use this file except in compliance with the License.
5
+# You may obtain a copy of the License at
6
+#
7
+#   http://www.apache.org/licenses/LICENSE-2.0
8
+#
9
+# Unless required by applicable law or agreed to in writing, software
10
+# distributed under the License is distributed on an "AS IS" BASIS,
11
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
+# implied.
13
+# See the License for the specific language governing permissions and
14
+# limitations under the License.
15
+
16
+from oslo_log import log as logging
17
+import pymongo
18
+
19
+from performa.engine import utils
20
+
21
+LOG = logging.getLogger(__name__)
22
+
23
+
24
+def aggregate(scenario, mongo_url, db_name, tag):
25
+    if 'aggregation' not in scenario:
26
+        return  # nothing to do
27
+
28
+    LOG.info('Running aggregation')
29
+
30
+    connection_params = utils.parse_url(mongo_url)
31
+    mongo_client = pymongo.MongoClient(**connection_params)
32
+    db = mongo_client.get_database(db_name)
33
+
34
+    aggregation = scenario['aggregation']
35
+
36
+    records_collection = db.get_collection('records')
37
+    series_collection = db.get_collection('series')
38
+
39
+    for op_group in aggregation:
40
+        for op, op_params in op_group.items():
41
+            if op == 'update':
42
+
43
+                select_query = op_params['query']
44
+                values_query = op_params['values']
45
+                values_pipeline = values_query['pipeline']
46
+
47
+                select_query['tag'] = tag
48
+                select_query['status'] = 'OK'
49
+
50
+                for rec in records_collection.find(select_query):
51
+                    start = rec['start']
52
+                    stop = rec['end']  # todo rename field into 'stop'
53
+
54
+                    series_pipeline = [
55
+                        {'$match': {'$and': [
56
+                            {'tag': tag},
57
+                            {'timestamp': {'$gte': start}},
58
+                            {'timestamp': {'$lte': stop}}
59
+                        ]}}
60
+                    ]
61
+                    series_pipeline.extend(values_pipeline)
62
+
63
+                    point = next(series_collection.aggregate(series_pipeline))
64
+                    del point['_id']  # to avoid overwriting
65
+                    rec.update(point)
66
+
67
+                    records_collection.update_one({'_id': rec['_id']},
68
+                                                  {'$set': point})
69
+
70
+                    LOG.debug('Updated record: %s', rec)

+ 3
- 0
performa/engine/main.py View File

@@ -19,6 +19,7 @@ from oslo_config import cfg
19 19
 from oslo_log import log as logging
20 20
 import yaml
21 21
 
22
+from performa.engine import aggregator
22 23
 from performa.engine import ansible_runner
23 24
 from performa.engine import config
24 25
 from performa.engine import player
@@ -60,6 +61,8 @@ def main():
60 61
 
61 62
     storage.store_data(cfg.CONF.mongo_url, cfg.CONF.mongo_db, records, series)
62 63
 
64
+    aggregator.aggregate(scenario, cfg.CONF.mongo_url, cfg.CONF.mongo_db, tag)
65
+
63 66
     report.generate_report(scenario, base_dir, cfg.CONF.mongo_url,
64 67
                            cfg.CONF.mongo_db, cfg.CONF.book, tag)
65 68
 

+ 24
- 1
performa/scenarios/db/sysbench.rst View File

@@ -7,7 +7,7 @@ This is the report of execution test plan
7 7
 Results
8 8
 ^^^^^^^
9 9
 
10
-Chart and table:
10
+Queries per second depending on threads count:
11 11
 
12 12
 {{'''
13 13
     title: Queries per second
@@ -30,6 +30,29 @@ Chart and table:
30 30
 ''' | chart
31 31
 }}
32 32
 
33
+Queries per second and mysqld CPU consumption depending on threads count:
34
+
35
+{{'''
36
+    title: Queries and and CPU util per second
37
+    axes:
38
+      x: threads
39
+      y: queries per sec
40
+      y2: mysqld CPU consumption, %
41
+    chart: line
42
+    pipeline:
43
+    - { $match: { task: sysbench_oltp, status: OK }}
44
+    - { $group: { _id: { threads: "$threads" },
45
+                  queries_total_per_sec: { $avg: { $divide: ["$queries_total", "$duration"] }},
46
+                  mysqld_total: { $avg: "$mysqld_total" }
47
+                }}
48
+    - { $project: { x: "$_id.threads",
49
+                    y: "$queries_total_per_sec",
50
+                    y2: { $multiply: [ "$mysqld_total", 100 ] }
51
+                  }}
52
+    - { $sort: { x: 1 }}
53
+''' | chart
54
+}}
55
+
33 56
 .. references:
34 57
 
35 58
 .. _Sysbench: https://github.com/akopytov/sysbench

+ 11
- 1
performa/scenarios/db/sysbench.yaml View File

@@ -23,7 +23,7 @@ execution:
23 23
   -
24 24
     hosts: $target
25 25
     matrix:
26
-      threads: [ 10, 20, 30 ]
26
+      threads: [ 10, 20, 30, 40, 50, 60 ]
27 27
     tasks:
28 28
     - sysbench_oltp:
29 29
         duration: 10
@@ -34,5 +34,15 @@ execution:
34 34
         command: stop
35 35
         labels: [ CPU, PRC, PRM ]
36 36
 
37
+aggregation:
38
+  -
39
+    update:
40
+      query:
41
+        { task: sysbench_oltp }
42
+      values:
43
+        pipeline:
44
+        - { $match: { task: atop, status: OK, label: PRC, name: mysqld }}
45
+        - { $group: { _id: null, mysqld_sys: { $avg: "$sys" }, mysqld_user: { $avg: "$user" }, mysqld_total: { $avg: { $add: [ "$sys", "$user" ] }} }}
46
+
37 47
 report:
38 48
   template: sysbench.rst

Loading…
Cancel
Save