diff --git a/monasca_log_api/reference/common/log_publisher.py b/monasca_log_api/reference/common/log_publisher.py index ae576dc3..b70751fc 100644 --- a/monasca_log_api/reference/common/log_publisher.py +++ b/monasca_log_api/reference/common/log_publisher.py @@ -13,9 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import falcon -import itertools - from monasca_common.kafka import producer from monasca_common.rest import utils as rest_utils from oslo_config import cfg @@ -81,36 +78,6 @@ class LogPublisher(object): ) LOG.info('Initializing LogPublisher <%s>', self) - # TODO(trebskit) caching of computed keys should be done - # TODO(trebskit) cache should have expires_at_like functionality - @staticmethod - def _build_key(tenant_it, obj=None): - """Message key builder - - Builds message key using tenant_id and dimensions (only values). - Used values are concatenated with ':' character for readability. - - :param str tenant_it: tenant id - :param dict obj: log instance - :return: key - :rtype: str - """ - - if obj is None: - obj = {} - if not (tenant_it or obj): - return '' - - str_list = [str(tenant_it)] - - dims = obj.get('dimensions', None) - if dims: - sorted_dims = sorted(dims) - for name in sorted_dims: - str_list.append(dims[name]) - - return ':'.join(filter(None, str_list)) - @staticmethod def _is_message_valid(message): """Validates message before sending. @@ -148,7 +115,6 @@ class LogPublisher(object): See also :py:class:`monasca_log_api.common.model.Envelope' :py:meth:`._is_message_valid' - :py:meth:`._build_key' :param dict|list messages: instance (or instances) of log envelope """ @@ -158,7 +124,6 @@ class LogPublisher(object): if not isinstance(messages, list): messages = [messages] - buckets = {} sent_counter = 0 to_sent_counter = len(messages) @@ -166,42 +131,18 @@ class LogPublisher(object): to_sent_counter, self._topics) try: + send_messages = [] for message in messages: if not self._is_message_valid(message): raise InvalidMessageException() - key = self._build_key(message['meta']['tenantId'], - message['log']) - msg = rest_utils.as_json(message).encode('utf8') - + msg = rest_utils.as_json(message) validation.validate_envelope_size(msg) + send_messages.append(msg) - if key not in buckets: - buckets[key] = [] - - buckets[key].append(msg) - - all_keys = buckets.keys() - LOG.debug('Publishing %d buckets of messages', len(all_keys)) - - topic_to_key = itertools.product(self._topics, all_keys) - for topic, key in topic_to_key: - - bucket = buckets.get(key) # array of messages for the same key - if not bucket: - LOG.warn('Empty bucket spotted, continue...') - continue - try: - self._kafka_publisher.publish(topic, bucket, key) - except Exception as ex: - raise falcon.HTTPServiceUnavailable('Service unavailable', - ex.message, 60) - - LOG.debug('Sent %d messages (topics=%s,key=%s)', - len(bucket), topic, key) - - # keep on track how many msgs have been sent - sent_counter += len(bucket) + for topic in self._topics: + self._kafka_publisher.publish(topic, send_messages) + sent_counter = to_sent_counter except Exception as ex: LOG.error('Failure in publishing messages to kafka') LOG.exception(ex) diff --git a/monasca_log_api/tests/test_log_publisher.py b/monasca_log_api/tests/test_log_publisher.py index 3f2882ea..2e5941f6 100644 --- a/monasca_log_api/tests/test_log_publisher.py +++ b/monasca_log_api/tests/test_log_publisher.py @@ -27,73 +27,6 @@ from monasca_log_api.tests import base EPOCH_START = datetime.datetime(1970, 1, 1) -class TestBuildKey(unittest.TestCase): - def test_should_return_empty_for_none_params(self): - self.assertFalse(log_publisher.LogPublisher._build_key(None, None)) - - def test_should_return_tenant_id_for_tenant_id_defined_1(self): - tenant_id = 'monasca' - self.assertEqual( - tenant_id, - log_publisher.LogPublisher._build_key(tenant_id, None) - ) - - def test_should_return_tenant_id_for_tenant_id_defined_2(self): - tenant_id = 'monasca' - self.assertEqual(tenant_id, - log_publisher.LogPublisher._build_key(tenant_id, {})) - - def test_should_return_ok_key_1(self): - # Evaluates if key matches value for defined tenant_id and - # application_type - tenant_id = 'monasca' - application_type = 'monasca-log-api' - log_object = { - 'dimensions': {'component': application_type} - } - expected_key = ':'.join([tenant_id, application_type]) - - self.assertEqual(expected_key, - log_publisher.LogPublisher._build_key(tenant_id, - log_object)) - - def test_should_return_ok_key_2(self): - # Evaluates if key matches value for defined tenant_id and - # application_type and single dimension - tenant_id = 'monasca' - application_type = 'monasca-log-api' - host = '/var/log/test' - log_object = { - 'dimensions': { - 'host': host, - 'component': application_type - } - } - expected_key = ':'.join([tenant_id, application_type, host]) - - self.assertEqual(expected_key, - log_publisher.LogPublisher._build_key(tenant_id, - log_object)) - - def test_should_return_ok_key_3(self): - # Evaluates if key matches value for defined tenant_id and - # application_type and two dimensions dimensions given unsorted - tenant_id = 'monasca' - component = 'monasca-log-api' - service = 'laas' - log_object = { - 'dimensions': { - 'component': component, - 'service': service - } - } - expected_key = ':'.join([tenant_id, component, service]) - - self.assertEqual(expected_key, - log_publisher.LogPublisher._build_key(tenant_id, - log_object)) - - class TestSendMessage(testing.TestBase): def setUp(self): self.conf = base.mock_config(self) @@ -169,9 +102,6 @@ class TestSendMessage(testing.TestBase): instance = log_publisher.LogPublisher() instance._kafka_publisher = kafka_producer instance.send_message({}) - expected_key = 'some_key' - instance._build_key = mock.Mock(name='_build_key', - return_value=expected_key) creation_time = ((datetime.datetime.utcnow() - EPOCH_START) .total_seconds()) @@ -199,8 +129,7 @@ class TestSendMessage(testing.TestBase): instance._kafka_publisher.publish.assert_called_once_with( self.conf.conf.log_publisher.topics[0], - [ujson.dumps(msg)], - expected_key) + [ujson.dumps(msg)]) @mock.patch('monasca_log_api.reference.common.log_publisher.producer' '.KafkaProducer') @@ -212,9 +141,6 @@ class TestSendMessage(testing.TestBase): instance = log_publisher.LogPublisher() instance._kafka_publisher = mock.Mock() instance.send_message({}) - expected_key = 'some_key' - instance._build_key = mock.Mock(name='_build_key', - return_value=expected_key) creation_time = ((datetime.datetime.utcnow() - EPOCH_START) .total_seconds()) @@ -246,8 +172,7 @@ class TestSendMessage(testing.TestBase): for topic in topics: instance._kafka_publisher.publish.assert_any_call( topic, - [json_msg], - expected_key) + [json_msg]) @mock.patch( 'monasca_log_api.reference.common.log_publisher.producer' @@ -284,7 +209,7 @@ class TestSendMessage(testing.TestBase): instance.send_message(msgs_data) - self.assertEqual(len(topics) * len(msgs_data), + self.assertEqual(len(topics), instance._kafka_publisher.publish.call_count) @mock.patch( @@ -360,5 +285,5 @@ class TestSendMessage(testing.TestBase): instance.send_message(msgs_data) - self.assertEqual(len(msgs_data), + self.assertEqual(len(topics), instance._kafka_publisher.publish.call_count)