Sample Get and Get-by-id implemented
Ceilometer API Sample Get and Get_by_id implemented and unit tested. Change-Id: I1d9d7298cced643f528a6af8a5ea513343159a36
This commit is contained in:
parent
2e0a2a5e1a
commit
91e54b3388
1
AUTHORS
1
AUTHORS
|
@ -4,3 +4,4 @@ Jiaming Lin <robin890650@gmail.com>
|
|||
Tong Li <litong01@us.ibm.com>
|
||||
Xiao Tan <xt85@cornell.edu>
|
||||
spzala <spzala@us.ibm.com>
|
||||
xiaotan2 <xiaotan2@uw.edu>
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
CHANGES
|
||||
=======
|
||||
|
||||
* Sample Get and Get-by-id implemented
|
||||
* Meter Get Statistics by name implemented
|
||||
* Meter Get_Meter_Byname implemented
|
||||
* Meters GET request implemented
|
||||
* Added more instructions on how to configure keystone middleware
|
||||
|
|
|
@ -12,6 +12,7 @@ dispatcher = alarmdefinitions
|
|||
dispatcher = notificationmethods
|
||||
dispatcher = alarms
|
||||
dispatcher = meters
|
||||
dispatcher = samples
|
||||
|
||||
[metrics]
|
||||
topic = metrics
|
||||
|
|
|
@ -40,3 +40,11 @@ class V2API(object):
|
|||
@resource_api.Restify('/v2.0/meters/{meter_name}/statistics', method='get')
|
||||
def get_meter_statistics(self, req, res, meter_name):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/samples', method='get')
|
||||
def get_samples(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/samples/{sample_id}', method='get')
|
||||
def get_sample_byid(self, req, res, sample_id):
|
||||
res.status = '501 Not Implemented'
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
# Copyright 2013 IBM Corp
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import falcon
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
from oslotest import base
|
||||
import requests
|
||||
|
||||
from kiloeyes.v2.elasticsearch import samples
|
||||
|
||||
try:
|
||||
import ujson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
|
||||
class TestCeilometerSampleDispatcher(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCeilometerSampleDispatcher, self).setUp()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.CONF.set_override('uri', 'fake_url', group='kafka_opts')
|
||||
self.CONF.set_override('topic', 'fake', group='samples')
|
||||
self.CONF.set_override('doc_type', 'fake', group='samples')
|
||||
self.CONF.set_override('index_prefix', 'also_fake', group='samples')
|
||||
self.CONF.set_override('index_template', 'etc/metrics.template',
|
||||
group='samples')
|
||||
self.CONF.set_override('uri', 'http://fake_es_uri', group='es_conn')
|
||||
|
||||
res = mock.Mock()
|
||||
res.status_code = 200
|
||||
res.json.return_value = {"data": {"mappings": {"fake": {
|
||||
"properties": {
|
||||
"dimensions": {"properties": {
|
||||
"key1": {"type": "long"}, "key2": {"type": "long"},
|
||||
"rkey0": {"type": "long"}, "rkey1": {"type": "long"},
|
||||
"rkey2": {"type": "long"}, "rkey3": {"type": "long"}}},
|
||||
"name": {"type": "string", "index": "not_analyzed"},
|
||||
"timestamp": {"type": "string", "index": "not_analyzed"},
|
||||
"value": {"type": "double"}}}}}}
|
||||
put_res = mock.Mock()
|
||||
put_res.status_code = '200'
|
||||
with mock.patch.object(requests, 'get',
|
||||
return_value=res):
|
||||
with mock.patch.object(requests, 'put', return_value=put_res):
|
||||
self.dispatcher = samples.CeilometerSampleDispatcher({})
|
||||
|
||||
self.response_str = """
|
||||
{"aggregations":{"by_name":{"doc_count_error_upper_bound":0,
|
||||
"sum_other_doc_count":0,"buckets":[{"key":"BABMGD","doc_count":300,
|
||||
"by_dim":{"buckets":[{"key": "64e6ce08b3b8547b7c32e5cfa5b7d81f",
|
||||
"doc_count":300,"samples":{"hits":{"hits":[{ "_type": "metrics",
|
||||
"_id": "AVOziWmP6-pxt0dRmr7j", "_index": "data_20160401000000",
|
||||
"_source":{"name":"BABMGD", "value": 4,
|
||||
"timestamp": 1461337094000,
|
||||
"dimensions_hash": "0afdb86f508962bb5d8af52df07ef35a",
|
||||
"project_id": "35b17138-b364-4e6a-a131-8f3099c5be68",
|
||||
"tenant_id": "bd9431c1-8d69-4ad3-803a-8d4a6b89fd36",
|
||||
"user_agent": "openstack", "dimensions": null,
|
||||
"user": "admin", "value_meta": null, "tenant": "admin",
|
||||
"user_id": "efd87807-12d2-4b38-9c70-5f5c2ac427ff"}}]}}}]}}]}}}
|
||||
"""
|
||||
|
||||
def test_initialization(self):
|
||||
# test that the kafka connection uri should be 'fake' as it was passed
|
||||
# in from configuration
|
||||
self.assertEqual(self.dispatcher._kafka_conn.uri, 'fake_url')
|
||||
|
||||
# test that the topic is samples as it was passed into dispatcher
|
||||
self.assertEqual(self.dispatcher._kafka_conn.topic, 'fake')
|
||||
|
||||
# test that the doc type of the es connection is fake
|
||||
self.assertEqual(self.dispatcher._es_conn.doc_type, 'fake')
|
||||
|
||||
self.assertEqual(self.dispatcher._es_conn.uri, 'http://fake_es_uri/')
|
||||
|
||||
# test that the query url is correctly formed
|
||||
self.assertEqual(self.dispatcher._query_url, (
|
||||
'http://fake_es_uri/also_fake*/fake/_search?search_type=count'))
|
||||
|
||||
def test_get_samples(self):
|
||||
res = mock.Mock()
|
||||
req = mock.Mock()
|
||||
|
||||
def _side_effect(arg):
|
||||
if arg == 'name':
|
||||
return 'tongli'
|
||||
elif arg == 'dimensions':
|
||||
return 'key1:100, key2:200'
|
||||
req.get_param.side_effect = _side_effect
|
||||
|
||||
req_result = mock.Mock()
|
||||
|
||||
req_result.json.return_value = json.loads(self.response_str)
|
||||
req_result.status_code = 200
|
||||
|
||||
with mock.patch.object(requests, 'post', return_value=req_result):
|
||||
self.dispatcher.get_samples(req, res)
|
||||
|
||||
# test that the response code is 200
|
||||
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
|
||||
obj = json.loads(res.body)
|
||||
self.assertEqual(obj[0]['meter'], 'BABMGD')
|
||||
self.assertEqual(obj[0]['id'], 'AVOziWmP6-pxt0dRmr7j')
|
||||
self.assertEqual(obj[0]['type'], 'metrics')
|
||||
self.assertEqual(obj[0]['user_id'],
|
||||
'efd87807-12d2-4b38-9c70-5f5c2ac427ff')
|
||||
self.assertEqual(obj[0]['project_id'],
|
||||
'35b17138-b364-4e6a-a131-8f3099c5be68')
|
||||
self.assertEqual(obj[0]['timestamp'], 1461337094000)
|
||||
self.assertEqual(obj[0]['volume'], 4)
|
||||
self.assertEqual(len(obj), 1)
|
||||
|
||||
def test_get_sample_byid(self):
|
||||
res = mock.Mock()
|
||||
req = mock.Mock()
|
||||
|
||||
def _side_effect(arg):
|
||||
if arg == 'name':
|
||||
return 'tongli'
|
||||
elif arg == 'dimensions':
|
||||
return 'key1:100, key2:200'
|
||||
req.get_param.side_effect = _side_effect
|
||||
|
||||
req_result = mock.Mock()
|
||||
|
||||
req_result.json.return_value = json.loads(self.response_str)
|
||||
req_result.status_code = 200
|
||||
|
||||
with mock.patch.object(requests, 'post', return_value=req_result):
|
||||
self.dispatcher.get_sample_byid(req, res, "AVOziWmP6-pxt0dRmr7j")
|
||||
|
||||
# test that the response code is 200
|
||||
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
|
||||
obj = json.loads(res.body)
|
||||
self.assertEqual(obj[0]['meter'], 'BABMGD')
|
||||
self.assertEqual(obj[0]['id'], 'AVOziWmP6-pxt0dRmr7j')
|
||||
self.assertEqual(obj[0]['type'], 'metrics')
|
||||
self.assertEqual(obj[0]['user_id'],
|
||||
'efd87807-12d2-4b38-9c70-5f5c2ac427ff')
|
||||
self.assertEqual(obj[0]['project_id'],
|
||||
'35b17138-b364-4e6a-a131-8f3099c5be68')
|
||||
self.assertEqual(obj[0]['timestamp'], 1461337094000)
|
||||
self.assertEqual(obj[0]['volume'], 4)
|
||||
self.assertEqual(len(obj), 1)
|
|
@ -0,0 +1,238 @@
|
|||
# Copyright 2013 IBM Corp
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import falcon
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import requests
|
||||
from stevedore import driver
|
||||
|
||||
from kiloeyes.common import es_conn
|
||||
from kiloeyes.common import kafka_conn
|
||||
from kiloeyes.common import namespace
|
||||
from kiloeyes.common import resource_api
|
||||
from kiloeyes.v2.elasticsearch import metrics
|
||||
|
||||
try:
|
||||
import ujson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
SAMPLES_OPTS = [
|
||||
cfg.StrOpt('topic', default='metrics',
|
||||
help='The topic that samples will be published to.'),
|
||||
cfg.StrOpt('doc_type', default='metrics',
|
||||
help='The doc type that samples will be saved into.'),
|
||||
cfg.StrOpt('index_strategy', default='fixed',
|
||||
help='The index strategy used to create index name.'),
|
||||
cfg.StrOpt('index_prefix', default='data_',
|
||||
help='The index prefix where samples were saved to.'),
|
||||
cfg.StrOpt('index_template', default='/etc/kiloeyes/metrics.template',
|
||||
help='The index template which samples index should use.'),
|
||||
cfg.IntOpt('size', default=10000,
|
||||
help=('The query result limit. Any result set more than '
|
||||
'the limit will be discarded. To see all the matching '
|
||||
'result, narrow your search by using a small time '
|
||||
'window or strong matching name')),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(SAMPLES_OPTS, group="samples")
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
UPDATED = str(datetime.datetime(2014, 1, 1, 0, 0, 0))
|
||||
|
||||
|
||||
class CeilometerSampleDispatcher(object):
|
||||
def __init__(self, global_conf):
|
||||
LOG.debug('initializing V2API!')
|
||||
super(CeilometerSampleDispatcher, self).__init__()
|
||||
self.topic = cfg.CONF.samples.topic
|
||||
self.doc_type = cfg.CONF.samples.doc_type
|
||||
self.index_template = cfg.CONF.samples.index_template
|
||||
self.size = cfg.CONF.samples.size
|
||||
self._kafka_conn = kafka_conn.KafkaConnection(self.topic)
|
||||
|
||||
# load index strategy
|
||||
if cfg.CONF.samples.index_strategy:
|
||||
self.index_strategy = driver.DriverManager(
|
||||
namespace.STRATEGY_NS,
|
||||
cfg.CONF.samples.index_strategy,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds={}).driver
|
||||
LOG.debug(dir(self.index_strategy))
|
||||
else:
|
||||
self.index_strategy = None
|
||||
|
||||
self.index_prefix = cfg.CONF.samples.index_prefix
|
||||
|
||||
self._es_conn = es_conn.ESConnection(
|
||||
self.doc_type, self.index_strategy, self.index_prefix)
|
||||
|
||||
# Setup the get samples query body pattern
|
||||
self._query_body = {
|
||||
"query": {"bool": {"must": []}},
|
||||
"size": self.size}
|
||||
|
||||
self._aggs_body = {}
|
||||
self._stats_body = {}
|
||||
self._sort_clause = []
|
||||
|
||||
# Setup the get samples query url, the url should be similar to this:
|
||||
# http://host:port/data_20141201/metrics/_search
|
||||
# the url should be made of es_conn uri, the index prefix, samples
|
||||
# dispatcher topic, then add the key word _search.
|
||||
self._query_url = ''.join([self._es_conn.uri,
|
||||
self._es_conn.index_prefix, '*/',
|
||||
cfg.CONF.samples.topic,
|
||||
'/_search?search_type=count'])
|
||||
|
||||
# Setup sample query aggregation command. To see the structure of
|
||||
# the aggregation, copy and paste it to a json formatter.
|
||||
self._sample_agg = """
|
||||
{"by_name":{"terms":{"field":"name","size":%(size)d},
|
||||
"aggs":{"by_dim":{"terms":{"field":"dimensions_hash","size":%(size)d},
|
||||
"aggs":{"samples":{"top_hits":{"_source":{"exclude":
|
||||
["dimensions_hash"]},"size":1}}}}}}}
|
||||
"""
|
||||
|
||||
self.setup_index_template()
|
||||
|
||||
def setup_index_template(self):
|
||||
status = '400'
|
||||
with open(self.index_template) as template_file:
|
||||
template_path = ''.join([self._es_conn.uri,
|
||||
'/_template/metrics'])
|
||||
es_res = requests.put(template_path, data=template_file.read())
|
||||
status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
|
||||
|
||||
if status == '400':
|
||||
LOG.error('Metrics template can not be created. Status code %s'
|
||||
% status)
|
||||
exit(1)
|
||||
else:
|
||||
LOG.debug('Index template set successfully! Status %s' % status)
|
||||
|
||||
def _get_agg_response(self, res):
|
||||
if res and res.status_code == 200:
|
||||
obj = res.json()
|
||||
if obj:
|
||||
return obj.get('aggregations')
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
def _render_hits(self, item, flag):
|
||||
_id = item['samples']['hits']['hits'][0]['_id']
|
||||
_type = item['samples']['hits']['hits'][0]['_type']
|
||||
_source = item['samples']['hits']['hits'][0]['_source']
|
||||
rslt = ('{"id":' + json.dumps(_id) + ','
|
||||
'"metadata":' + json.dumps(_source['dimensions']) + ','
|
||||
'"meter":' + json.dumps(_source['name']) + ','
|
||||
'"project_id":' +
|
||||
json.dumps(_source['project_id']) + ','
|
||||
'"recorded_at":' +
|
||||
json.dumps(_source['timestamp']) + ','
|
||||
'"resource_id":' +
|
||||
json.dumps(_source['tenant_id']) + ','
|
||||
'"source":' + json.dumps(_source['user_agent']) + ','
|
||||
'"timestamp":' + json.dumps(_source['timestamp']) + ','
|
||||
'"type":' + json.dumps(_type) + ','
|
||||
'"unit":null,'
|
||||
'"user_id":' + json.dumps(_source['user_id']) + ','
|
||||
'"volume":' + json.dumps(_source['value']) + '}')
|
||||
if flag['is_first']:
|
||||
flag['is_first'] = False
|
||||
return rslt
|
||||
else:
|
||||
return ',' + rslt
|
||||
|
||||
def _make_body(self, buckets):
|
||||
flag = {'is_first': True}
|
||||
yield '['
|
||||
for by_name in buckets:
|
||||
if by_name['by_dim']:
|
||||
for by_dim in by_name['by_dim']['buckets']:
|
||||
yield self._render_hits(by_dim, flag)
|
||||
yield ']'
|
||||
|
||||
@resource_api.Restify('/v2.0/samples', method='get')
|
||||
def get_samples(self, req, res):
|
||||
LOG.debug('The samples GET request is received')
|
||||
|
||||
# process query condition
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
_samples_ag = self._sample_agg % {"size": self.size}
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _samples_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _samples_ag + '}'
|
||||
|
||||
LOG.debug('Request body:' + body)
|
||||
LOG.debug('Request url:' + self._query_url)
|
||||
es_res = requests.post(self._query_url, data=body)
|
||||
res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
|
||||
|
||||
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
|
||||
res_data = self._get_agg_response(es_res)
|
||||
if res_data:
|
||||
# convert the response into ceilometer sample format
|
||||
aggs = res_data['by_name']['buckets']
|
||||
|
||||
res.body = ''.join(self._make_body(aggs))
|
||||
res.content_type = 'application/json;charset=utf-8'
|
||||
else:
|
||||
res.body = ''
|
||||
|
||||
@resource_api.Restify('/v2.0/samples/{sample_id}', method='get')
|
||||
def get_sample_byid(self, req, res, sample_id):
|
||||
LOG.debug('The sample %s GET request is received' % sample_id)
|
||||
|
||||
# process query condition
|
||||
query = []
|
||||
metrics.ParamUtil.common(req, query)
|
||||
_sample_ag = self._sample_agg % {"size": self.size}
|
||||
if query:
|
||||
body = ('{"query":{"bool":{"must":' + json.dumps(query) + '}},'
|
||||
'"size":' + str(self.size) + ','
|
||||
'"aggs":' + _sample_ag + '}')
|
||||
else:
|
||||
body = '{"aggs":' + _sample_ag + '}'
|
||||
|
||||
# modify the query url to filter out name
|
||||
query_url = []
|
||||
if sample_id:
|
||||
query_url = self._query_url + '&q=_id:' + sample_id
|
||||
else:
|
||||
query_url = self._query_url
|
||||
LOG.debug('Request body:' + body)
|
||||
LOG.debug('Request url:' + query_url)
|
||||
es_res = requests.post(query_url, data=body)
|
||||
res.status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
|
||||
|
||||
LOG.debug('Query to ElasticSearch returned: %s' % es_res.status_code)
|
||||
res_data = self._get_agg_response(es_res)
|
||||
LOG.debug('@$Result data is %s\n' % res_data)
|
||||
if res_data:
|
||||
# convert the response into ceilometer sample format
|
||||
aggs = res_data['by_name']['buckets']
|
||||
|
||||
res.body = ''.join(self._make_body(aggs))
|
||||
res.content_type = 'application/json;charset=utf-8'
|
||||
else:
|
||||
res.body = ''
|
|
@ -51,6 +51,7 @@ kiloeyes.dispatcher =
|
|||
notificationmethods = kiloeyes.v2.elasticsearch.notificationmethods:NotificationMethodDispatcher
|
||||
alarms = kiloeyes.v2.elasticsearch.alarms:AlarmDispatcher
|
||||
meters = kiloeyes.v2.elasticsearch.meters:MeterDispatcher
|
||||
samples = kiloeyes.v2.elasticsearch.samples:CeilometerSampleDispatcher
|
||||
|
||||
kiloeyes.index.strategy =
|
||||
timed = kiloeyes.microservice.timed_strategy:TimedStrategy
|
||||
|
|
Loading…
Reference in New Issue