diff --git a/documentation/monasca_log_api.reference.v3.rst b/documentation/monasca_log_api.reference.v3.rst index a4e9eb42..577d62b2 100644 --- a/documentation/monasca_log_api.reference.v3.rst +++ b/documentation/monasca_log_api.reference.v3.rst @@ -17,3 +17,9 @@ monasca_log_api.reference.v3 module :undoc-members: :show-inheritance: +.. automodule:: monasca_log_api.reference.v3.common.bulk_processor + :members: + :undoc-members: + :show-inheritance: + + diff --git a/monasca_log_api/reference/common/log_publisher.py b/monasca_log_api/reference/common/log_publisher.py index 3fdccca6..bdd72f4b 100644 --- a/monasca_log_api/reference/common/log_publisher.py +++ b/monasca_log_api/reference/common/log_publisher.py @@ -23,6 +23,7 @@ from oslo_log import log from monasca_log_api.monitoring import client from monasca_log_api.monitoring import metrics +from monasca_log_api.reference.common import model LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -55,9 +56,6 @@ log_publisher_group = cfg.OptGroup(name='log_publisher', title='log_publisher') cfg.CONF.register_group(log_publisher_group) cfg.CONF.register_opts(log_publisher_opts, log_publisher_group) -ENVELOPE_SCHEMA = ['log', 'meta', 'creation_time'] -"""Log envelope (i.e.) message keys""" - class InvalidMessageException(Exception): pass @@ -83,7 +81,10 @@ class LogPublisher(object): """ def __init__(self): + self._topics = CONF.log_publisher.topics + self.max_message_size = CONF.log_publisher.max_message_size + self._kafka_publisher = producer.KafkaProducer( url=CONF.log_publisher.kafka_url ) @@ -106,87 +107,6 @@ class LogPublisher(object): LOG.info('Initializing LogPublisher <%s>', self) - @staticmethod - def _is_message_valid(message): - """Validates message before sending. - - Methods checks if message is :py:class:`dict`. - If so dictionary is verified against having following keys: - - * meta - * log - * creation_time - - If keys are found, each key must have a valueH. - - If at least none of the conditions is met - :py:class:`.InvalidMessageException` is raised - - :raises InvalidMessageException: if message does not comply to schema - - """ - if not isinstance(message, dict): - return False - - for key in ENVELOPE_SCHEMA: - if not (key in message and message.get(key)): - return False - - return True - - def _truncate(self, envelope): - """Truncates the message if needed. - - Each message send to kafka is verified. - Method checks if message serialized to json - exceeds maximum allowed size that can be posted to kafka - queue. If so, method truncates message property of the log - by difference between message and allowed size. - - :param Envelope envelope: envelope to check - :return: truncated message if size is exceeded, otherwise message - is left unmodified - """ - - msg_str = rest_utils.as_json(envelope) - - max_size = CONF.log_publisher.max_message_size - envelope_size = ((len(bytearray(msg_str)) + - _TIMESTAMP_KEY_SIZE + - _KAFKA_META_DATA_SIZE) - if msg_str is not None else -1) - - size_diff = (envelope_size - max_size) + _TRUNCATION_SAFE_OFFSET - - LOG.debug('_truncate(max_message_size=%d, message_size=%d, diff=%d)', - max_size, envelope_size, size_diff) - - if size_diff > 1: - truncate_by = size_diff + _TRUNCATED_PROPERTY_SIZE - - self._logs_truncated_gauge.send( - name=None, - value=truncate_by - ) - - LOG.warn(('Detected message that exceeds %d bytes,' - 'message will be truncated by %d bytes'), - max_size, - truncate_by) - - log_msg = envelope['log']['message'] - truncated_log_msg = log_msg[:-truncate_by] - - envelope['log']['truncated'] = True - envelope['log']['message'] = truncated_log_msg - - # will just transform message once again without truncation - return rest_utils.as_json(envelope) - - self._logs_truncated_gauge.send(name=None, value=0) - - return msg_str - def send_message(self, messages): """Sends message to each configured topic. @@ -194,8 +114,8 @@ class LogPublisher(object): Falsy messages (i.e. empty) are not shipped to kafka See also - :py:class:`monasca_log_api.common.model.Envelope' - :py:meth:`._is_message_valid' + * :py:class:`monasca_log_api.common.model.Envelope` + * :py:meth:`._is_message_valid` :param dict|list messages: instance (or instances) of log envelope """ @@ -206,18 +126,16 @@ class LogPublisher(object): messages = [messages] sent_counter = 0 - to_sent_counter = len(messages) + num_of_msgs = len(messages) LOG.debug('About to publish %d messages to %s topics', - to_sent_counter, self._topics) + num_of_msgs, self._topics) try: send_messages = [] - for message in messages: - if not self._is_message_valid(message): - raise InvalidMessageException() - msg = self._truncate(message) + for message in messages: + msg = self._transform_message(message) send_messages.append(msg) with self._publish_time_ms.time(name=None): @@ -229,24 +147,80 @@ class LogPublisher(object): LOG.exception(ex) raise ex finally: - self._logs_published_counter.increment(value=sent_counter) - if sent_counter == to_sent_counter: - LOG.info('Successfully published all [%d] messages', - sent_counter) - else: - failed_to_send = to_sent_counter - sent_counter - error_str = ('Failed to sent all messages, %d ' - 'messages out of %d have not been published') - LOG.error(error_str, failed_to_send, to_sent_counter) + self._after_publish(sent_counter, num_of_msgs) - self._logs_lost_counter.increment( - value=failed_to_send - ) + def _transform_message(self, message): + """Transforms message into JSON. + + Method executes transformation operation for + single element. Operation is set of following + operations: + + * checking if message is valid + (:py:func:`.LogPublisher._is_message_valid`) + * truncating message if necessary + (:py:func:`.LogPublisher._truncate`) + + :param model.Envelope message: instance of message + :return: serialized message + :rtype: str + """ + if not self._is_message_valid(message): + raise InvalidMessageException() + return self._truncate(message) + + def _truncate(self, envelope): + """Truncates the message if needed. + + Each message send to kafka is verified. + Method checks if message serialized to json + exceeds maximum allowed size that can be posted to kafka + queue. If so, method truncates message property of the log + by difference between message and allowed size. + + :param Envelope envelope: original envelope + :return: serialized message + :rtype: str + """ + + msg_str = rest_utils.as_json(envelope) + envelope_size = ((len(bytearray(msg_str)) + + _TIMESTAMP_KEY_SIZE + + _KAFKA_META_DATA_SIZE) + if msg_str is not None else -1) + + diff_size = ((envelope_size - self.max_message_size) + + _TRUNCATION_SAFE_OFFSET) + + if diff_size > 1: + truncated_by = diff_size + _TRUNCATED_PROPERTY_SIZE + + LOG.warn(('Detected message that exceeds %d bytes,' + 'message will be truncated by %d bytes'), + self.max_message_size, + truncated_by) + + log_msg = envelope['log']['message'] + truncated_log_msg = log_msg[:-truncated_by] + + envelope['log']['truncated'] = True + envelope['log']['message'] = truncated_log_msg + self._logs_truncated_gauge.send(name=None, value=truncated_by) + + msg_str = rest_utils.as_json(envelope) + else: + self._logs_truncated_gauge.send(name=None, value=0) + + return msg_str def _publish(self, messages): - to_sent = len(messages) + """Publishes messages to kafka. - LOG.debug('Publishing %d messages', to_sent) + :param list messages: list of messages + """ + num_of_msg = len(messages) + + LOG.debug('Publishing %d messages', num_of_msg) try: for topic in self._topics: @@ -254,7 +228,39 @@ class LogPublisher(object): topic, messages ) - LOG.debug('Sent %d messages to topic %s', to_sent, topic) + LOG.debug('Sent %d messages to topic %s', num_of_msg, topic) except Exception as ex: raise falcon.HTTPServiceUnavailable('Service unavailable', ex.message, 60) + + @staticmethod + def _is_message_valid(message): + """Validates message before sending. + + Methods checks if message is :py:class:`model.Envelope`. + By being instance of this class it is ensured that all required + keys are found and they will have their values. + + """ + return message and isinstance(message, model.Envelope) + + def _after_publish(self, send_count, to_send_count): + """Executed after publishing to sent metrics. + + :param int send_count: how many messages have been sent + :param int to_send_count: how many messages should be sent + + """ + + failed_to_send = to_send_count - send_count + + if failed_to_send == 0: + LOG.debug('Successfully published all [%d] messages', + send_count) + else: + error_str = ('Failed to send all messages, %d ' + 'messages out of %d have not been published') + LOG.error(error_str, failed_to_send, to_send_count) + + self._logs_published_counter.increment(value=send_count) + self._logs_lost_counter.increment(value=failed_to_send) diff --git a/monasca_log_api/reference/common/validation.py b/monasca_log_api/reference/common/validation.py index 943184fe..bbbd8414 100644 --- a/monasca_log_api/reference/common/validation.py +++ b/monasca_log_api/reference/common/validation.py @@ -84,23 +84,8 @@ def validate_application_type(application_type=None): validate_match() -def validate_dimensions(dimensions): - """Validates dimensions type. - - Empty dimensions are not being validated. - For details see: - - :param dict dimensions: dimensions to validate - - * :py:data:`DIMENSION_NAME_CONSTRAINTS` - * :py:data:`DIMENSION_VALUE_CONSTRAINTS` - """ - - def validate_name(name): - if not name: - raise exceptions.HTTPUnprocessableEntity( - 'Dimension name cannot be empty' - ) +def _validate_dimension_name(name): + try: if len(name) > DIMENSION_NAME_CONSTRAINTS['MAX_LENGTH']: raise exceptions.HTTPUnprocessableEntity( 'Dimension name %s must be 255 characters or less' % @@ -116,26 +101,42 @@ def validate_dimensions(dimensions): 'Dimension name %s may not contain: %s' % (name, '> < = { } ( ) \' " , ; &') ) + except (TypeError, IndexError): + raise exceptions.HTTPUnprocessableEntity( + 'Dimension name cannot be empty' + ) - def validate_value(value): - if not value: - raise exceptions.HTTPUnprocessableEntity( - 'Dimension value cannot be empty' - ) + +def _validate_dimension_value(value): + try: + value[0] if len(value) > DIMENSION_VALUE_CONSTRAINTS['MAX_LENGTH']: raise exceptions.HTTPUnprocessableEntity( 'Dimension value %s must be 255 characters or less' % value ) + except (TypeError, IndexError): + raise exceptions.HTTPUnprocessableEntity( + 'Dimension value cannot be empty' + ) - if (isinstance(dimensions, dict) and not - isinstance(dimensions, basestring)): - for dim_name in dimensions: - validate_name(dim_name) - validate_value(dimensions[dim_name]) +def validate_dimensions(dimensions): + """Validates dimensions type. - else: + Empty dimensions are not being validated. + For details see: + + :param dict dimensions: dimensions to validate + + * :py:data:`DIMENSION_NAME_CONSTRAINTS` + * :py:data:`DIMENSION_VALUE_CONSTRAINTS` + """ + try: + for dim_name, dim_value in dimensions.iteritems(): + _validate_dimension_name(dim_name) + _validate_dimension_value(dim_value) + except AttributeError: raise exceptions.HTTPUnprocessableEntity( 'Dimensions %s must be a dictionary (map)' % dimensions) diff --git a/monasca_log_api/reference/v3/common/bulk_processor.py b/monasca_log_api/reference/v3/common/bulk_processor.py new file mode 100644 index 00000000..ad50cc3e --- /dev/null +++ b/monasca_log_api/reference/v3/common/bulk_processor.py @@ -0,0 +1,154 @@ +# Copyright 2016 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log + +from monasca_log_api.reference.common import log_publisher +from monasca_log_api.reference.common import model +from monasca_log_api.reference.common import validation + +LOG = log.getLogger(__name__) +CONF = cfg.CONF + + +class BulkProcessor(log_publisher.LogPublisher): + """BulkProcessor for effective log processing and publishing. + + BulkProcessor is customized version of + :py:class:`monasca_log_api.reference.common.log_publisher.LogPublisher` + that utilizes processing of bulk request inside single loop. + + """ + + def __init__(self, logs_in_counter, logs_rejected_counter): + """Initializes BulkProcessor. + + :param logs_in_counter: V3 received logs counter + :param logs_rejected_counter: V3 rejected logs counter + """ + super(BulkProcessor, self).__init__() + + assert logs_in_counter is not None + assert logs_rejected_counter is not None + + self._logs_in_counter = logs_in_counter + self._logs_rejected_counter = logs_rejected_counter + + self.service_region = CONF.service.region + + def send_message(self, logs, global_dimensions=None, log_tenant_id=None): + """Sends bulk package to kafka + + :param list logs: received logs + :param dict global_dimensions: global dimensions for each log + :param str log_tenant_id: tenant who sent logs + """ + + num_of_msgs = len(logs) if logs else 0 + sent_count = 0 + to_send_msgs = [] + + LOG.debug('Bulk package ', + num_of_msgs, global_dimensions, log_tenant_id) + + try: + for log_el in logs: + t_el = self._transform_message(log_el, + global_dimensions, + log_tenant_id) + if t_el: + to_send_msgs.append(t_el) + + with self._publish_time_ms.time(name=None): + self._publish(to_send_msgs) + sent_count = len(to_send_msgs) + + except Exception as ex: + LOG.error('Failed to send bulk package ', + num_of_msgs, global_dimensions) + LOG.exception(ex) + raise ex + finally: + self._update_counters(len(to_send_msgs), num_of_msgs) + self._after_publish(sent_count, len(to_send_msgs)) + + def _update_counters(self, in_counter, to_send_counter): + rejected_counter = to_send_counter - in_counter + + self._logs_in_counter.increment(value=in_counter) + self._logs_rejected_counter.increment(value=rejected_counter) + + def _transform_message(self, log_element, *args): + try: + validation.validate_log_message(log_element) + + log_envelope = model.Envelope.new_envelope( + log=log_element, + tenant_id=args[1], + region=self.service_region, + dimensions=self._get_dimensions(log_element, + global_dims=args[0]) + ) + + msg_payload = (super(BulkProcessor, self) + ._transform_message(log_envelope)) + + return msg_payload + except Exception as ex: + LOG.error('Log transformation failed, rejecting log') + LOG.exception(ex) + + return None + + def _create_envelope(self, log_element, tenant_id, dimensions=None): + """Create a log envelope. + + :param dict log_element: raw log element + :param str tenant_id: tenant who sent logs + :param dict dimensions: log dimensions + :return: log envelope + :rtype: model.Envelope + + """ + return + + def _get_dimensions(self, log_element, global_dims=None): + """Get the dimensions of log element. + + If global dimensions are specified and passed to this method, + both instances are merged with each other. + + If neither is specified empty dictionary is returned. + + If only local dimensions are specified they are returned without any + additional operations. The last statement applies also + to global dimensions. + + :param dict log_element: raw log instance + :param dict global_dims: global dimensions or None + :return: local dimensions merged with global dimensions + :rtype: dict + """ + local_dims = log_element.get('dimensions', {}) + + if not global_dims: + global_dims = {} + if local_dims: + validation.validate_dimensions(local_dims) + + dimensions = global_dims.copy() + dimensions.update(local_dims) + + return dimensions diff --git a/monasca_log_api/reference/v3/common/helpers.py b/monasca_log_api/reference/v3/common/helpers.py index cf157f1c..de4d99fe 100644 --- a/monasca_log_api/reference/v3/common/helpers.py +++ b/monasca_log_api/reference/v3/common/helpers.py @@ -19,6 +19,9 @@ import falcon from monasca_common.rest import utils as rest_utils from oslo_log import log +from monasca_log_api.api import exceptions +from monasca_log_api.reference.common import validation + LOG = log.getLogger(__name__) @@ -37,3 +40,18 @@ def read_json_msg_body(req): LOG.debug(ex) raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON') + + +def get_global_dimensions(request_body): + """Get the top level dimensions in the HTTP request body.""" + global_dims = request_body.get('dimensions', {}) + validation.validate_dimensions(global_dims) + return global_dims + + +def get_logs(request_body): + """Get the logs in the HTTP request body.""" + if 'logs' not in request_body: + raise exceptions.HTTPUnprocessableEntity( + 'Unprocessable Entity Logs not found') + return request_body['logs'] diff --git a/monasca_log_api/reference/v3/logs.py b/monasca_log_api/reference/v3/logs.py index e2c6d3b4..081b563c 100644 --- a/monasca_log_api/reference/v3/logs.py +++ b/monasca_log_api/reference/v3/logs.py @@ -21,9 +21,8 @@ from monasca_log_api.api import exceptions from monasca_log_api.api import headers from monasca_log_api.api import logs_api from monasca_log_api.monitoring import metrics -from monasca_log_api.reference.common import log_publisher -from monasca_log_api.reference.common import model from monasca_log_api.reference.common import validation +from monasca_log_api.reference.v3.common import bulk_processor from monasca_log_api.reference.v3.common import helpers LOG = log.getLogger(__name__) @@ -37,8 +36,11 @@ class Logs(logs_api.LogsApi): def __init__(self): super(Logs, self).__init__() - self._log_publisher = log_publisher.LogPublisher() + self._processor = bulk_processor.BulkProcessor( + logs_in_counter=self._logs_in_counter, + logs_rejected_counter=self._logs_rejected_counter + ) self._bulks_rejected_counter = self._statsd.get_counter( name=metrics.LOGS_BULKS_REJECTED_METRIC, dimensions=self._metrics_dimensions @@ -77,91 +79,29 @@ class Logs(logs_api.LogsApi): self._logs_size_gauge.send(name=None, value=int(req.content_length)) - envelopes = [] try: - for log_element in log_list: - LOG.trace('Processing log %s', log_element) - - validation.validate_log_message(log_element) - - dimensions = self._get_dimensions(log_element, - global_dimensions) - envelope = self._create_log_envelope(tenant_id, - cross_tenant_id, - dimensions, - log_element) - envelopes.append(envelope) - - LOG.trace('Log %s processed into envelope %s', - log_element, - envelope) + self._processor.send_message( + logs=log_list, + global_dimensions=global_dimensions, + log_tenant_id=tenant_id if tenant_id else cross_tenant_id + ) except Exception as ex: - LOG.error('Failed to process log %s', log_element) - LOG.exception(ex) res.status = getattr(ex, 'status', falcon.HTTP_500) return - finally: - rejected_logs = len(envelopes) - len(log_list) - # if entire bulk is rejected because of single error - # that means only one counter must be called - if rejected_logs < 0: - self._logs_rejected_counter.increment(value=len(log_list)) - else: - self._logs_in_counter.increment(value=len(log_list)) - # at this point only possible metrics regard - # publishing phase - self._send_logs(envelopes) res.status = falcon.HTTP_204 - def _get_dimensions(self, log_element, global_dims): - """Get the dimensions in the log element.""" - local_dims = log_element.get('dimensions', {}) - if local_dims: - validation.validate_dimensions(local_dims) - if global_dims: - dimensions = global_dims.copy() - dimensions.update(local_dims) - else: - dimensions = local_dims - else: - dimensions = global_dims - - return dimensions - - def _get_global_dimensions(self, request_body): + @staticmethod + def _get_global_dimensions(request_body): """Get the top level dimensions in the HTTP request body.""" global_dims = request_body.get('dimensions', {}) validation.validate_dimensions(global_dims) return global_dims - def _get_logs(self, request_body): + @staticmethod + def _get_logs(request_body): """Get the logs in the HTTP request body.""" if 'logs' not in request_body: raise exceptions.HTTPUnprocessableEntity( 'Unprocessable Entity Logs not found') return request_body['logs'] - - def _create_log_envelope(self, - tenant_id, - cross_tenant_id, - dimensions=None, - log_element=None): - """Create a log envelope and return it as a json string.""" - - envelope = model.Envelope.new_envelope( - log=log_element, - tenant_id=tenant_id if tenant_id else cross_tenant_id, - region=CONF.service.region, - dimensions=dimensions - ) - - return envelope - - def _send_logs(self, logs): - """Send the logs to Kafka.""" - try: - self._log_publisher.send_message(logs) - except Exception as ex: - LOG.exception(ex) - raise ex diff --git a/monasca_log_api/tests/test_log_processor.py b/monasca_log_api/tests/test_log_processor.py new file mode 100644 index 00000000..ac9f0f4b --- /dev/null +++ b/monasca_log_api/tests/test_log_processor.py @@ -0,0 +1,24 @@ +# Copyright 2016 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from falcon import testing + +from monasca_log_api.tests import base + + +class TestBulkProcessor(testing.TestBase): + def setUp(self): + self.conf = base.mock_config(self) + return super(TestBulkProcessor, self).setUp() diff --git a/monasca_log_api/tests/test_log_publisher.py b/monasca_log_api/tests/test_log_publisher.py index cc3779c1..5f042c22 100644 --- a/monasca_log_api/tests/test_log_publisher.py +++ b/monasca_log_api/tests/test_log_publisher.py @@ -24,6 +24,7 @@ from falcon import testing import mock from monasca_log_api.reference.common import log_publisher +from monasca_log_api.reference.common import model from monasca_log_api.tests import base EPOCH_START = datetime.datetime(1970, 1, 1) @@ -122,8 +123,9 @@ class TestSendMessage(testing.TestBase): dimension_1_value = '50' dimension_2_name = 'cpu_time' dimension_2_value = '60' - msg = { - 'log': { + + msg = model.Envelope( + log={ 'message': 1, 'application_type': application_type, 'dimensions': { @@ -131,12 +133,11 @@ class TestSendMessage(testing.TestBase): dimension_2_name: dimension_2_value } }, - 'creation_time': creation_time, - 'meta': { + meta={ 'tenantId': 1 } - } - + ) + msg['creation_time'] = creation_time instance.send_message(msg) instance._kafka_publisher.publish.assert_called_once_with( @@ -162,8 +163,8 @@ class TestSendMessage(testing.TestBase): dimension_2_name = 'cpu_time' dimension_2_value = '60' application_type = 'monasca-log-api' - msg = { - 'log': { + msg = model.Envelope( + log={ 'message': 1, 'application_type': application_type, 'dimensions': { @@ -171,11 +172,11 @@ class TestSendMessage(testing.TestBase): dimension_2_name: dimension_2_value } }, - 'creation_time': creation_time, - 'meta': { + meta={ 'tenantId': 1 } - } + ) + msg['creation_time'] = creation_time json_msg = ujson.dumps(msg) instance.send_message(msg) diff --git a/monasca_log_api/tests/test_logs_v3.py b/monasca_log_api/tests/test_logs_v3.py index bd6ff20d..19e510f0 100644 --- a/monasca_log_api/tests/test_logs_v3.py +++ b/monasca_log_api/tests/test_logs_v3.py @@ -66,25 +66,28 @@ def _generate_v3_payload(log_count): class TestLogsVersion(unittest.TestCase): - @mock.patch('monasca_log_api.reference.v3.logs.log_publisher' - '.LogPublisher') + + @mock.patch('monasca_log_api.reference.v3.common.' + 'bulk_processor.BulkProcessor') def test_should_return_v3_as_version(self, _): logs_resource = logs.Logs() self.assertEqual('v3.0', logs_resource.version) +@mock.patch('monasca_log_api.reference.common.log_publisher.producer.' + 'KafkaProducer') +@mock.patch('monasca_log_api.monitoring.client.monascastatsd.Connection') class TestLogsMonitoring(testing.TestBase): - @mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher') - def test_monitor_bulk_rejected(self, _): - resource = _init_resource(self) + def test_monitor_bulk_rejected(self, __, _): + res = _init_resource(self) - resource._logs_in_counter = in_counter = mock.Mock() - resource._logs_rejected_counter = rejected_counter = mock.Mock() - resource._bulks_rejected_counter = bulk_counter = mock.Mock() - resource._logs_size_gauge = size_gauge = mock.Mock() + in_counter = res._logs_in_counter.increment = mock.Mock() + bulk_counter = res._bulks_rejected_counter.increment = mock.Mock() + rejected_counter = res._logs_rejected_counter.increment = mock.Mock() + size_gauge = res._logs_size_gauge.send = mock.Mock() - resource._get_logs = mock.Mock( + res._get_logs = mock.Mock( side_effect=log_api_exceptions.HTTPUnprocessableEntity('')) log_count = 1 @@ -104,19 +107,18 @@ class TestLogsMonitoring(testing.TestBase): body=payload ) - self.assertEqual(1, bulk_counter.increment.call_count) - self.assertEqual(0, in_counter.increment.call_count) - self.assertEqual(0, rejected_counter.increment.call_count) - self.assertEqual(0, size_gauge.send.call_count) + self.assertEqual(1, bulk_counter.call_count) + self.assertEqual(0, in_counter.call_count) + self.assertEqual(0, rejected_counter.call_count) + self.assertEqual(0, size_gauge.call_count) - @mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher') - def test_monitor_not_all_logs_ok(self, _): - resource = _init_resource(self) + def test_monitor_not_all_logs_ok(self, __, _): + res = _init_resource(self) - resource._logs_in_counter = in_counter = mock.Mock() - resource._logs_rejected_counter = rejected_counter = mock.Mock() - resource._bulks_rejected_counter = bulk_counter = mock.Mock() - resource._logs_size_gauge = size_gauge = mock.Mock() + in_counter = res._logs_in_counter.increment = mock.Mock() + bulk_counter = res._bulks_rejected_counter.increment = mock.Mock() + rejected_counter = res._logs_rejected_counter.increment = mock.Mock() + size_gauge = res._logs_size_gauge.send = mock.Mock() log_count = 5 reject_logs = 1 @@ -124,10 +126,10 @@ class TestLogsMonitoring(testing.TestBase): payload = json.dumps(v3_body) content_length = len(payload) - side_effects = [{} for __ in xrange(log_count - reject_logs)] + side_effects = [{} for ___ in xrange(log_count - reject_logs)] side_effects.append(log_api_exceptions.HTTPUnprocessableEntity('')) - resource._get_dimensions = mock.Mock(side_effect=side_effects) + res._processor._get_dimensions = mock.Mock(side_effect=side_effects) self.simulate_request( ENDPOINT, @@ -141,30 +143,31 @@ class TestLogsMonitoring(testing.TestBase): body=payload ) - self.assertEqual(1, bulk_counter.increment.call_count) + self.assertEqual(1, bulk_counter.call_count) self.assertEqual(0, - bulk_counter.increment.mock_calls[0][2]['value']) + bulk_counter.mock_calls[0][2]['value']) - self.assertEqual(0, in_counter.increment.call_count) + self.assertEqual(1, in_counter.call_count) + self.assertEqual(log_count - reject_logs, + in_counter.mock_calls[0][2]['value']) - self.assertEqual(1, rejected_counter.increment.call_count) - self.assertEqual(log_count, - rejected_counter.increment.mock_calls[0][2]['value']) + self.assertEqual(1, rejected_counter.call_count) + self.assertEqual(reject_logs, + rejected_counter.mock_calls[0][2]['value']) - self.assertEqual(1, size_gauge.send.call_count) + self.assertEqual(1, size_gauge.call_count) self.assertEqual(content_length, - size_gauge.send.mock_calls[0][2]['value']) + size_gauge.mock_calls[0][2]['value']) - @mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher') - def test_monitor_all_logs_ok(self, _): - resource = _init_resource(self) + def test_monitor_all_logs_ok(self, __, _): + res = _init_resource(self) - resource._logs_in_counter = in_counter = mock.Mock() - resource._logs_rejected_counter = rejected_counter = mock.Mock() - resource._bulks_rejected_counter = bulk_counter = mock.Mock() - resource._logs_size_gauge = size_gauge = mock.Mock() + in_counter = res._logs_in_counter.increment = mock.Mock() + bulk_counter = res._bulks_rejected_counter.increment = mock.Mock() + rejected_counter = res._logs_rejected_counter.increment = mock.Mock() + size_gauge = res._logs_size_gauge.send = mock.Mock() - resource._send_logs = mock.Mock() + res._send_logs = mock.Mock() log_count = 10 @@ -184,16 +187,18 @@ class TestLogsMonitoring(testing.TestBase): body=payload ) - self.assertEqual(1, bulk_counter.increment.call_count) + self.assertEqual(1, bulk_counter.call_count) self.assertEqual(0, - bulk_counter.increment.mock_calls[0][2]['value']) + bulk_counter.mock_calls[0][2]['value']) - self.assertEqual(1, in_counter.increment.call_count) + self.assertEqual(1, in_counter.call_count) self.assertEqual(log_count, - in_counter.increment.mock_calls[0][2]['value']) + in_counter.mock_calls[0][2]['value']) - self.assertEqual(0, rejected_counter.increment.call_count) + self.assertEqual(1, rejected_counter.call_count) + self.assertEqual(0, + rejected_counter.mock_calls[0][2]['value']) - self.assertEqual(1, size_gauge.send.call_count) + self.assertEqual(1, size_gauge.call_count) self.assertEqual(content_length, - size_gauge.send.mock_calls[0][2]['value']) + size_gauge.mock_calls[0][2]['value']) diff --git a/monasca_log_api/tests/test_v2_v3_compare.py b/monasca_log_api/tests/test_v2_v3_compare.py index 55989d68..4dc68d4e 100644 --- a/monasca_log_api/tests/test_v2_v3_compare.py +++ b/monasca_log_api/tests/test_v2_v3_compare.py @@ -21,12 +21,13 @@ from monasca_log_api.api import logs_api from monasca_log_api.reference.v2 import logs as v2_logs from monasca_log_api.reference.v3 import logs as v3_logs -from monasca_log_api.reference.common import model - class SameV2V3Output(testing.TestBase): - @mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher') - def test_send_identical_messages(self, publisher): + + # noinspection PyProtectedMember + @mock.patch('monasca_log_api.reference.common.' + 'log_publisher.producer.KafkaProducer') + def test_send_identical_messages(self, _): # mocks only log publisher, so the last component that actually # sends data to kafka # case is to verify if publisher was called with same arguments @@ -35,8 +36,10 @@ class SameV2V3Output(testing.TestBase): v2 = v2_logs.Logs() v3 = v3_logs.Logs() - v2._kafka_publisher = publisher - v3._log_publisher = publisher + publish_mock = mock.Mock() + + v2._kafka_publisher._kafka_publisher.publish = publish_mock + v3._processor._kafka_publisher.publish = publish_mock component = 'monasca-log-api' service = 'laas' @@ -92,17 +95,24 @@ class SameV2V3Output(testing.TestBase): body=json.dumps(v3_body) ) - self.assertEqual(2, publisher.call_count) + self.assertEqual(2, publish_mock.call_count) # in v2 send_messages is called with single envelope - v2_send_msg_arg = publisher.method_calls[0][1][0] + v2_send_msg_arg = publish_mock.mock_calls[0][1][1] # in v3 it is always called with list of envelopes - v3_send_msg_arg = publisher.method_calls[1][1][0][0] + v3_send_msg_arg = publish_mock.mock_calls[1][1][1] self.maxDiff = None # at this point we know that both args should be identical self.assertEqual(type(v2_send_msg_arg), type(v3_send_msg_arg)) - self.assertIsInstance(v3_send_msg_arg, model.Envelope) - self.assertDictEqual(v2_send_msg_arg, v3_send_msg_arg) + self.assertIsInstance(v3_send_msg_arg, list) + + self.assertEqual(len(v2_send_msg_arg), len(v3_send_msg_arg)) + self.assertEqual(1, len(v2_send_msg_arg)) + + v2_msg_as_dict = json.loads(v2_send_msg_arg[0]) + v3_msg_as_dict = json.loads(v3_send_msg_arg[0]) + + self.assertDictEqual(v2_msg_as_dict, v3_msg_as_dict)