Make sure that the index field mapping is correct

This patch set added initialization code to make sure that before
an index gets created, index template is added to ElasticSearch so
field such as timestamp will be correctly indexed as date. This
patch set also updated the dependency.

Change-Id: I8a5a77674fb172ad5c1b73a940477fa6acd30059
This commit is contained in:
Tong Li 2016-03-07 09:19:52 -05:00
parent d8f3f770e4
commit a731b9f649
7 changed files with 89 additions and 17 deletions

16
AUTHORS
View File

@ -1,12 +1,4 @@
Maintainer Chang-Yi Lee <cy.l@inwinstack.com>
---------- Jiaming Lin <robin890650@gmail.com>
OpenStack Foundation Tong Li <litong01@us.ibm.com>
IRC: #openstack-kiloeyes on irc.freenode.net spzala <spzala@us.ibm.com>
Original Authors
----------------
Tong Li (litong01@us.ibm.com)
Contributors
------------
Tong Li (litong01@us.ibm.com)

18
ChangeLog Normal file
View File

@ -0,0 +1,18 @@
CHANGES
=======
* ES now returns timestamp as milliseconds vs seconds
* enable bulk message post on persister
* Added more instructions
* bulk insert can not be done implicitly
* fix the partitions data type error
* Updated the installation instructions
* added more instruction on how to create an all-in-one kiloeyes
* unit test passed for py27
* Add Vagrant sample file to ease development environment bootstrap
* Make the server more flexible with configuration files
* remove old openstack incubator project reference
* remove old oslo.config and use new oslo_config
* Make minor modifications in the README
* seeding the project
* Added .gitreview

35
etc/metrics.template Executable file
View File

@ -0,0 +1,35 @@
{
"template":"data_*",
"order":100,
"settings":{
"number_of_shards":5
},
"mappings":{
"_default_":{
"_all":{
"enabled":false
},
"dynamic_templates":[
{
"date_template":{
"match":"timestamp",
"match_mapping_type":"date",
"mapping":{
"type":"date"
}
}
},
{
"string_template":{
"match":"*",
"match_mapping_type":"string",
"mapping":{
"type":"string",
"index":"not_analyzed"
}
}
}
]
}
}
}

View File

@ -78,7 +78,9 @@ class TestMetricDispatcher(base.BaseTestCase):
self.CONF.set_override('topic', 'fake', group='metrics') self.CONF.set_override('topic', 'fake', group='metrics')
self.CONF.set_override('doc_type', 'fake', group='metrics') self.CONF.set_override('doc_type', 'fake', group='metrics')
self.CONF.set_override('index_prefix', 'also_fake', group='metrics') self.CONF.set_override('index_prefix', 'also_fake', group='metrics')
self.CONF.set_override('uri', 'fake_es_uri', group='es_conn') self.CONF.set_override('index_template', 'etc/metrics.template',
group='metrics')
self.CONF.set_override('uri', 'http://fake_es_uri', group='es_conn')
res = mock.Mock() res = mock.Mock()
res.status_code = 200 res.status_code = 200
@ -91,9 +93,12 @@ class TestMetricDispatcher(base.BaseTestCase):
"name": {"type": "string", "index": "not_analyzed"}, "name": {"type": "string", "index": "not_analyzed"},
"timestamp": {"type": "string", "index": "not_analyzed"}, "timestamp": {"type": "string", "index": "not_analyzed"},
"value": {"type": "double"}}}}}} "value": {"type": "double"}}}}}}
put_res = mock.Mock()
put_res.status_code = '200'
with mock.patch.object(requests, 'get', with mock.patch.object(requests, 'get',
return_value=res): return_value=res):
self.dispatcher = metrics.MetricDispatcher({}) with mock.patch.object(requests, 'put', return_value=put_res):
self.dispatcher = metrics.MetricDispatcher({})
def test_initialization(self): def test_initialization(self):
# test that the kafka connection uri should be 'fake' as it was passed # test that the kafka connection uri should be 'fake' as it was passed
@ -106,11 +111,11 @@ class TestMetricDispatcher(base.BaseTestCase):
# test that the doc type of the es connection is fake # test that the doc type of the es connection is fake
self.assertEqual(self.dispatcher._es_conn.doc_type, 'fake') self.assertEqual(self.dispatcher._es_conn.doc_type, 'fake')
self.assertEqual(self.dispatcher._es_conn.uri, 'fake_es_uri/') self.assertEqual(self.dispatcher._es_conn.uri, 'http://fake_es_uri/')
# test that the query url is correctly formed # test that the query url is correctly formed
self.assertEqual(self.dispatcher._query_url, ( self.assertEqual(self.dispatcher._query_url, (
'fake_es_uri/also_fake*/fake/_search?search_type=count')) 'http://fake_es_uri/also_fake*/fake/_search?search_type=count'))
def test_post_data(self): def test_post_data(self):
with mock.patch.object(kafka_conn.KafkaConnection, 'send_messages', with mock.patch.object(kafka_conn.KafkaConnection, 'send_messages',

View File

@ -42,6 +42,8 @@ METRICS_OPTS = [
help='The index strategy used to create index name.'), help='The index strategy used to create index name.'),
cfg.StrOpt('index_prefix', default='data_', cfg.StrOpt('index_prefix', default='data_',
help='The index prefix where metrics were saved to.'), help='The index prefix where metrics were saved to.'),
cfg.StrOpt('index_template', default='/etc/kiloeyes/metrics.template',
help='The index template which metrics index should use.'),
cfg.IntOpt('size', default=10000, cfg.IntOpt('size', default=10000,
help=('The query result limit. Any result set more than ' help=('The query result limit. Any result set more than '
'the limit will be discarded. To see all the matching ' 'the limit will be discarded. To see all the matching '
@ -135,6 +137,7 @@ class MetricDispatcher(object):
super(MetricDispatcher, self).__init__() super(MetricDispatcher, self).__init__()
self.topic = cfg.CONF.metrics.topic self.topic = cfg.CONF.metrics.topic
self.doc_type = cfg.CONF.metrics.doc_type self.doc_type = cfg.CONF.metrics.doc_type
self.index_template = cfg.CONF.metrics.index_template
self.size = cfg.CONF.metrics.size self.size = cfg.CONF.metrics.size
self._kafka_conn = kafka_conn.KafkaConnection(self.topic) self._kafka_conn = kafka_conn.KafkaConnection(self.topic)
@ -207,6 +210,24 @@ class MetricDispatcher(object):
{"field":"value"}}}}}}}}} {"field":"value"}}}}}}}}}
""" """
# Setup index template
self.setup_index_template()
def setup_index_template(self):
status = '400'
with open(self.index_template) as template_file:
template_path = ''.join([self._es_conn.uri,
'/_template/metrics'])
es_res = requests.put(template_path, data=template_file.read())
status = getattr(falcon, 'HTTP_%s' % es_res.status_code)
if status == '400':
LOG.error('Metrics template can not be created. Status code %s'
% status)
exit(1)
else:
LOG.debug('Index template set successfully! Status %s' % status)
def post_data(self, req, res): def post_data(self, req, res):
LOG.debug('Getting the call.') LOG.debug('Getting the call.')
msg = req.stream.read() msg = req.stream.read()

View File

@ -12,7 +12,7 @@ oslo.i18n>=1.5.0
oslo.log>=1.0.0 oslo.log>=1.0.0
oslo.service>=0.1.0 oslo.service>=0.1.0
pastedeploy>=1.3.3 pastedeploy>=1.3.3
pbr>=0.6,!=0.7,<1.0 pbr>=1.6
python-dateutil>=1.5 python-dateutil>=1.5
six>=1.7.0 six>=1.7.0
stevedore>=0.14 stevedore>=0.14

View File

@ -33,6 +33,7 @@ data_files =
etc/alarms-persister.conf etc/alarms-persister.conf
etc/kiloeyes-notification-engine.conf etc/kiloeyes-notification-engine.conf
etc/kiloeyes-threshold-engine.conf etc/kiloeyes-threshold-engine.conf
etc/metrics.template
[entry_points] [entry_points]
console_scripts = console_scripts =