From b93ded41ed4150468f210154c569b258411685d4 Mon Sep 17 00:00:00 2001 From: Tong Li Date: Fri, 12 Feb 2016 00:09:57 -0500 Subject: [PATCH] enable bulk message post on persister Change-Id: Ifbf2c5e1517c0e7a042b241657725640e8cdebec --- kiloeyes/common/es_conn.py | 13 ++----------- kiloeyes/microservice/metrics_fixer.py | 9 ++++++++- kiloeyes/tests/common/test_es_conn.py | 14 -------------- kiloeyes/tests/microservice/test_metrics_fixer.py | 2 +- 4 files changed, 11 insertions(+), 27 deletions(-) diff --git a/kiloeyes/common/es_conn.py b/kiloeyes/common/es_conn.py index 181b7b7..7604892 100755 --- a/kiloeyes/common/es_conn.py +++ b/kiloeyes/common/es_conn.py @@ -67,20 +67,11 @@ class ESConnection(object): if self.drop_data: return else: - # figure out id situation - _id = '' - if self.id_field: - obj = json.loads(msg) - _id = obj.get(self.id_field) - if not _id: - LOG.error('Msg does not have required id field %s' % - self.id_field) - return 400 # index may change over the time, it has to be called for each # request index = self.index_strategy.get_index() - path = '%s%s%s/%s/%s' % (self.uri, self.index_prefix, - index, self.doc_type, _id) + path = '%s%s%s/%s/_bulk' % (self.uri, self.index_prefix, + index, self.doc_type) res = requests.post(path, data=msg) LOG.debug('Msg post target=%s' % path) LOG.debug('Msg posted with response code: %s' % res.status_code) diff --git a/kiloeyes/microservice/metrics_fixer.py b/kiloeyes/microservice/metrics_fixer.py index cbff6cf..8738b22 100755 --- a/kiloeyes/microservice/metrics_fixer.py +++ b/kiloeyes/microservice/metrics_fixer.py @@ -45,7 +45,14 @@ class MetricsFixer(object): def process_msg(self, msg): try: - return MetricsFixer._add_hash(json.loads(msg)) + data = json.loads(msg) + if not isinstance(data, list): + data = [data] + result = '' + for item in data: + result += '{"index":{}}\n' + MetricsFixer._add_hash(item) + result += '\n' + return result except Exception: LOG.exception('') return '' diff --git a/kiloeyes/tests/common/test_es_conn.py b/kiloeyes/tests/common/test_es_conn.py index ef7b0c4..2038e23 100755 --- a/kiloeyes/tests/common/test_es_conn.py +++ b/kiloeyes/tests/common/test_es_conn.py @@ -43,17 +43,3 @@ class TestESConnection(tests.BaseTestCase): with mock.patch.object(requests, 'post', return_value=req_result): conn.send_messages(json.dumps(msg)) self.assertTrue(requests.post.called) - - def test_send_messages_without_id(self): - self.CONF.set_override('id_field', 'id', group='es_conn') - self.CONF.set_override('uri', 'http://fake', group='es_conn') - self.CONF.set_override('time_unit', 'h', group='timed_strategy') - strategy = timed_strategy.TimedStrategy() - conn = es_conn.ESConnection('alarms', strategy, 'pre_') - req_result = mock.Mock() - req_result.status_code = 204 - msg = {'not_id': 'whatever'} - with mock.patch.object(requests, 'post', return_value=req_result): - res = conn.send_messages(json.dumps(msg)) - self.assertFalse(requests.post.called) - self.assertEqual(res, 400) diff --git a/kiloeyes/tests/microservice/test_metrics_fixer.py b/kiloeyes/tests/microservice/test_metrics_fixer.py index 1daf4c5..5fce241 100755 --- a/kiloeyes/tests/microservice/test_metrics_fixer.py +++ b/kiloeyes/tests/microservice/test_metrics_fixer.py @@ -48,7 +48,7 @@ class TestMetricsFixer(tests.BaseTestCase): fixer = metrics_fixer.MetricsFixer() result = fixer.process_msg(json.dumps(items)) self.assertTrue(isinstance(result, str)) - self.assertFalse(result.startswith('{"index":{}}')) + self.assertTrue(result.startswith('{"index":{}}')) def test_process_msg_multiple(self): items = [{'name': 'name1', 'dimensions': {'name1': 'value1'},