Browse Source

Ceilometer Meter Query and Aggregation Implemented

Implemented Meter Query Utility that supports operations such as
eq, ne, gt, lt, gte, lte. The implementation also supports multiple
filters.

Implemented Meter Statistics Aggregation where client can specify
which aggregation function to return.

Change-Id: I5b19faa4a0d46befbe6e84c975af084cdc0d9d12
Xiao Tan 2 years ago
parent
commit
b536e94286

+ 2
- 1
kiloeyes/tests/v2/elasticsearch/test_meters.py View File

@@ -177,7 +177,8 @@ class TestMeterDispatcher(base.BaseTestCase):
177 177
         self.assertEqual(obj[0]['project_id'],
178 178
                          '35b17138-b364-4e6a-a131-8f3099c5be68')
179 179
         self.assertEqual(obj[0]['counter_volume'], 4)
180
-        self.assertEqual(obj[0]['timestamp'], 1461337094000)
180
+        self.assertEqual(obj[0]['timestamp'],
181
+                         tu.iso8601_from_timestamp(1461337094000))
181 182
         self.assertEqual(len(obj), 1)
182 183
 
183 184
     def test_do_get_statistics(self):

+ 180
- 42
kiloeyes/v2/elasticsearch/meters.py View File

@@ -12,7 +12,9 @@
12 12
 # License for the specific language governing permissions and limitations
13 13
 # under the License.
14 14
 
15
+import calendar
15 16
 import datetime
17
+import dateutil.parser
16 18
 import falcon
17 19
 from oslo_config import cfg
18 20
 from oslo_log import log
@@ -24,7 +26,6 @@ from kiloeyes.common import kafka_conn
24 26
 from kiloeyes.common import namespace
25 27
 from kiloeyes.common import resource_api
26 28
 from kiloeyes.common import timeutils as tu
27
-from kiloeyes.v2.elasticsearch import metrics
28 29
 
29 30
 try:
30 31
     import ujson as json
@@ -56,6 +57,146 @@ LOG = log.getLogger(__name__)
56 57
 UPDATED = str(datetime.datetime(2014, 1, 1, 0, 0, 0))
57 58
 
58 59
 
60
+class ParamUtil(object):
61
+
62
+    @staticmethod
63
+    def process_one_filter(field, op, value, must, must_not):
64
+        if (not field or not field.strip() or
65
+                not value or not value.strip()):
66
+            return
67
+        # default op is equal
68
+        # convert ceilometer op to elasticsearch op
69
+        if not op or not op.strip():
70
+            op = 'eq'
71
+        elif op.strip() == 'le':
72
+            op = 'lte'
73
+        elif op.strip() == 'ge':
74
+            op = 'gte'
75
+
76
+        # if field is timestamp, convert it to correct format
77
+        if field.strip() == 'timestamp':
78
+            value = dateutil.parser.parse(value.strip())
79
+            value = value.timetuple()
80
+            value = calendar.timegm(value) * 1000
81
+        elif field.strip() == 'value':
82
+            value = float(value.strip())
83
+
84
+        # construct query based on op
85
+        if op.strip() == 'eq':
86
+            must.append({'match': {field.strip(): value}})
87
+        elif op.strip() == 'ne':
88
+            must_not.append({'match': {field.strip(): value}})
89
+        # range search
90
+        else:
91
+            must.append({'range': {field.strip(): {op.strip(): value}}})
92
+        return
93
+
94
+    @staticmethod
95
+    def filtering(req, _agg, size, query):
96
+        must = []
97
+        must_not = []
98
+        # default body
99
+        body = '{"aggs":' + _agg + '}'
100
+        # process ceilometer API query
101
+        # query as json, can support multiple filters
102
+        if(req.content_type == 'application/json'):
103
+            if not query:
104
+                query = req.stream.read()
105
+            if not query:
106
+                return body
107
+            if 'q' in json.loads(query):
108
+                objs = json.loads(query)['q']
109
+            else:
110
+                return body
111
+            for obj in objs:
112
+                field = obj['field']
113
+                op = obj['op']
114
+                value = obj['value']
115
+                ParamUtil.process_one_filter(field, op, value, must, must_not)
116
+        # query as param, only support one filter
117
+        else:
118
+            field = req.get_param('q.field')
119
+            op = req.get_param('q.op')
120
+            value = req.get_param('q.value')
121
+            ParamUtil.process_one_filter(field, op, value, must, must_not)
122
+
123
+        q = ''
124
+        if must:
125
+            q = q + ('"must":' + json.dumps(must))
126
+        elif must_not:
127
+            q = q + ('"must_not":' + json.dumps(must_not))
128
+        if q != '':
129
+            body = ('{"query":{"bool":{' + q + '}},'
130
+                    '"size":' + str(size) + ','
131
+                    '"aggs":' + _agg + '}')
132
+        return body
133
+
134
+    @staticmethod
135
+    def process_one_aggregate(func, _agg, count):
136
+        if not func or not func.strip():
137
+            return _agg
138
+        ret_agg = ''
139
+        # use sep ',' to seperate aggregations
140
+        sep = ''
141
+        if count != 0:
142
+            sep = ','
143
+        if func.strip() == 'avg':
144
+            ret_agg = _agg + (sep + '"average":{"avg":{"field":"value"}}')
145
+        elif func.strip() == 'max':
146
+            ret_agg = _agg + (sep + '"maximum":{"max":{"field":"value"}}')
147
+        elif func.strip() == 'min':
148
+            ret_agg = _agg + (sep + '"minimum":{"min":{"field":"value"}}')
149
+        elif func.strip() == 'sum':
150
+            ret_agg = _agg + (sep + '"sum":{"sum":{"field":"value"}}')
151
+        elif func.strip() == 'count' or func.strip() == 'value_count':
152
+            ret_agg = _agg + (sep +
153
+                              '"count":{"value_count":{"field":"value"}}')
154
+        elif func.strip() == 'stats':
155
+            ret_agg = _agg + (sep + '"statistics":{"stats":{"field":"value"}}')
156
+        return ret_agg
157
+
158
+    @staticmethod
159
+    def aggregate(req):
160
+        # process meter statistics selectable aggregates
161
+        _stats_agg = ''
162
+        len_aggs = 1
163
+        # aggregate as json, support multiple aggregations
164
+        if(req.content_type == 'application/json'):
165
+            query = req.stream.read()
166
+            if not query:
167
+                return ('"statistics":{"stats":{"field":"value"}}', query)
168
+            if 'aggregate' in json.loads(query):
169
+                objs = json.loads(query)['aggregate']
170
+            else:
171
+                return ('"statistics":{"stats":{"field":"value"}}', query)
172
+            len_aggs = len(objs)
173
+            for i in xrange(0, len_aggs):
174
+                obj = objs[i]
175
+                func = obj['func']
176
+                _stats_agg = ParamUtil.process_one_aggregate(func,
177
+                                                             _stats_agg, i)
178
+        # aggregate as param, only support one aggregation
179
+        else:
180
+            query = None
181
+            func = req.get_param('aggregate.func')
182
+            _stats_agg = ParamUtil.process_one_aggregate(func, _stats_agg, 0)
183
+
184
+        if _stats_agg == '':
185
+            _stats_agg = ('"statistics":{"stats":{"field":"value"}}')
186
+        # return query and pass it to ParamUtil.common to process query
187
+        # it needs to be return because req.stream.read can only be read once
188
+        return (_stats_agg, query)
189
+
190
+    @staticmethod
191
+    def period(req):
192
+        try:
193
+            if req.get_param('period'):
194
+                return str(int(req.get_param('period'))) + 's'
195
+        except Exception:
196
+            pass
197
+        return '300s'
198
+
199
+
59 200
 class MeterDispatcher(object):
60 201
     def __init__(self, global_conf):
61 202
         LOG.debug('initializing V2API!')
@@ -106,7 +247,7 @@ class MeterDispatcher(object):
106 247
         {"by_name":{"terms":{"field":"name","size":%(size)d},
107 248
         "aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d},
108 249
         "aggs":{"meters":{"top_hits":{"_source":{"exclude":
109
-        ["dimensions_hash","timestamp","value"]},"size":1}}}}}}}
250
+        ["dimensions_hash"]},"size":1}}}}}}}
110 251
         """
111 252
 
112 253
         self._oldsample_agg = """
@@ -122,8 +263,7 @@ class MeterDispatcher(object):
122 263
         "size":%(size)d},"aggs":{"dimension":{"top_hits":{"_source":
123 264
         {"exclude":["dimensions_hash","timestamp","value"]},"size":1}},
124 265
         "periods":{"date_histogram":{"field":"timestamp",
125
-        "interval":"%(period)s"},"aggs":{"statistics":{"stats":
126
-        {"field":"value"}}}}}}}}}
266
+        "interval":"%(period)s"},"aggs":{%(agg)s}}}}}}}
127 267
         """
128 268
 
129 269
         self.setup_index_template()
@@ -166,15 +306,8 @@ class MeterDispatcher(object):
166 306
         LOG.debug('The meters GET request is received')
167 307
 
168 308
         # process query condition
169
-        query = []
170
-        metrics.ParamUtil.common(req, query)
171 309
         _meters_ag = self._meters_agg % {"size": self.size}
172
-        if query:
173
-            body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
174
-                    '"size":' + str(self.size) + ','
175
-                    '"aggs":' + _meters_ag + '}')
176
-        else:
177
-            body = '{"aggs":' + _meters_ag + '}'
310
+        body = ParamUtil.filtering(req, _meters_ag, self.size, None)
178 311
 
179 312
         LOG.debug('Request body:' + body)
180 313
         LOG.debug('Request url:' + self._query_url)
@@ -201,6 +334,8 @@ class MeterDispatcher(object):
201 334
                         '"source":' + json.dumps(_source['user_agent']) + ','
202 335
                         '"type":' + json.dumps(_type) + ','
203 336
                         '"unit":null,'
337
+                        '"value":' + json.dumps(_source['value']) + ','
338
+                        '"timestamp":' + json.dumps(_source['timestamp']) + ','
204 339
                         '"user_id":' + json.dumps(_source['user_id']) + '}')
205 340
                 if flag['is_first']:
206 341
                     flag['is_first'] = False
@@ -230,15 +365,8 @@ class MeterDispatcher(object):
230 365
         LOG.debug('The meter %s sample GET request is received' % meter_name)
231 366
 
232 367
         # process query condition
233
-        query = []
234
-        metrics.ParamUtil.common(req, query)
235 368
         _meter_ag = self._oldsample_agg % {"size": self.size}
236
-        if query:
237
-            body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
238
-                    '"size":' + str(self.size) + ','
239
-                    '"aggs":' + _meter_ag + '}')
240
-        else:
241
-            body = '{"aggs":' + _meter_ag + '}'
369
+        body = ParamUtil.filtering(req, _meter_ag, self.size, None)
242 370
 
243 371
         # modify the query url to filter out name
244 372
         query_url = []
@@ -253,7 +381,6 @@ class MeterDispatcher(object):
253 381
 
254 382
         LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
255 383
         res_data = self._get_agg_response(es_res)
256
-        LOG.debug('@$Result data is %s\n' % res_data)
257 384
         if res_data:
258 385
             # convert the response into ceilometer meter OldSample format
259 386
             aggs = res_data['by_name']['buckets']
@@ -275,7 +402,8 @@ class MeterDispatcher(object):
275 402
                         json.dumps(_source['tenant_id']) + ','
276 403
                         '"resource_metadata":null,'
277 404
                         '"source":' + json.dumps(_source['user_agent']) + ','
278
-                        '"timestamp":' + json.dumps(_source['timestamp']) + ','
405
+                        '"timestamp":"' +
406
+                        tu.iso8601_from_timestamp(_source['timestamp']) + '",'
279 407
                         '"user_id":' + json.dumps(_source['user_id']) + '}')
280 408
                 if flag['is_first']:
281 409
                     flag['is_first'] = False
@@ -300,19 +428,12 @@ class MeterDispatcher(object):
300 428
     def get_meter_statistics(self, req, res, meter_name):
301 429
         LOG.debug('The meter %s statistics GET request is received' %
302 430
                   meter_name)
303
-        # process query conditions
304
-        query = []
305
-        metrics.ParamUtil.common(req, query)
306
-        period = metrics.ParamUtil.period(req)
307
-
308
-        _stats_ag = (self._meter_stats_agg %
309
-                     {"size": self.size, "period": period})
310
-        if query:
311
-            body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
312
-                    '"size":' + str(self.size) + ','
313
-                    '"aggs":' + _stats_ag + '}')
314
-        else:
315
-            body = '{"aggs":' + _stats_ag + '}'
431
+        # process query condition
432
+        (_agg, query) = ParamUtil.aggregate(req)
433
+        period = ParamUtil.period(req)
434
+        _stats_ag = self._meter_stats_agg % {"size": self.size,
435
+                                             "period": period, "agg": _agg}
436
+        body = ParamUtil.filtering(req, _stats_ag, self.size, query)
316 437
 
317 438
         # modify the query url to filter out name
318 439
         query_url = []
@@ -323,14 +444,14 @@ class MeterDispatcher(object):
323 444
         es_res = requests.post(query_url, data=body)
324 445
         res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
325 446
 
447
+        LOG.debug('Request body:' + body)
326 448
         LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
327 449
         res_data = self._get_agg_response(es_res)
450
+        LOG.debug('@Result: %s', res_data)
328 451
         if res_data:
329 452
             # convert the response into Ceilometer Statistics format
330 453
             aggs = res_data['by_name']['buckets']
331 454
 
332
-            LOG.debug('@$Stats: %s' % json.dumps(aggs))
333
-
334 455
             def _render_stats(dim):
335 456
                 is_first = True
336 457
                 oldest_time = []
@@ -347,11 +468,28 @@ class MeterDispatcher(object):
347 468
                         period_diff = (current_time - previous_time) / 1000
348 469
                         duration_diff = (current_time - oldest_time) / 1000
349 470
                     # parses the statistics data
350
-                    _max = str(item['statistics']['max'])
351
-                    _min = str(item['statistics']['min'])
352
-                    _sum = str(item['statistics']['sum'])
353
-                    _avg = str(item['statistics']['avg'])
354
-                    _count = str(item['statistics']['count'])
471
+                    _max = 'null'
472
+                    _min = 'null'
473
+                    _sum = 'null'
474
+                    _avg = 'null'
475
+                    _count = 'null'
476
+                    if 'statistics' in item:
477
+                        _max = str(item['statistics']['max'])
478
+                        _min = str(item['statistics']['min'])
479
+                        _sum = str(item['statistics']['sum'])
480
+                        _avg = str(item['statistics']['avg'])
481
+                        _count = str(item['statistics']['count'])
482
+                    else:
483
+                        if 'average' in item:
484
+                            _avg = str(item['average']['value'])
485
+                        if 'maximum' in item:
486
+                            _max = str(item['maximum']['value'])
487
+                        if 'minimum' in item:
488
+                            _min = str(item['minimum']['value'])
489
+                        if 'count' in item:
490
+                            _count = str(item['count']['value'])
491
+                        if 'sum' in item:
492
+                            _sum = str(item['sum']['value'])
355 493
                     curr_timestamp = tu.iso8601_from_timestamp(current_time)
356 494
                     prev_timestamp = tu.iso8601_from_timestamp(previous_time)
357 495
                     old_timestamp = tu.iso8601_from_timestamp(oldest_time)

+ 3
- 17
kiloeyes/v2/elasticsearch/samples.py View File

@@ -23,7 +23,7 @@ from kiloeyes.common import es_conn
23 23
 from kiloeyes.common import kafka_conn
24 24
 from kiloeyes.common import namespace
25 25
 from kiloeyes.common import resource_api
26
-from kiloeyes.v2.elasticsearch import metrics
26
+from kiloeyes.v2.elasticsearch import meters
27 27
 
28 28
 try:
29 29
     import ujson as json
@@ -173,15 +173,8 @@ class CeilometerSampleDispatcher(object):
173 173
         LOG.debug('The samples GET request is received')
174 174
 
175 175
         # process query condition
176
-        query = []
177
-        metrics.ParamUtil.common(req, query)
178 176
         _samples_ag = self._sample_agg % {"size": self.size}
179
-        if query:
180
-            body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
181
-                    '"size":' + str(self.size) + ','
182
-                    '"aggs":' + _samples_ag + '}')
183
-        else:
184
-            body = '{"aggs":' + _samples_ag + '}'
177
+        body = meters.ParamUtil.filtering(req, _samples_ag, self.size, None)
185 178
 
186 179
         LOG.debug('Request body:' + body)
187 180
         LOG.debug('Request url:' + self._query_url)
@@ -204,15 +197,8 @@ class CeilometerSampleDispatcher(object):
204 197
         LOG.debug('The sample %s GET request is received' % sample_id)
205 198
 
206 199
         # process query condition
207
-        query = []
208
-        metrics.ParamUtil.common(req, query)
209 200
         _sample_ag = self._sample_agg % {"size": self.size}
210
-        if query:
211
-            body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
212
-                    '"size":' + str(self.size) + ','
213
-                    '"aggs":' + _sample_ag + '}')
214
-        else:
215
-            body = '{"aggs":' + _sample_ag + '}'
201
+        body = meters.ParamUtil.filtering(req, _sample_ag, self.size, None)
216 202
 
217 203
         # modify the query url to filter out name
218 204
         query_url = []

Loading…
Cancel
Save