Optimize bulk processing
Commit provides several optimization for bulk processing * using one loop less during bulk processing * refactored dimensions merging * simplified envelope validation * simplified dimension validation (less ifs) Change-Id: I833d9a53a33ede44b6d5eb300e98162f70d50116
This commit is contained in:
parent
4a5d440eba
commit
91f318e8db
|
@ -17,3 +17,9 @@ monasca_log_api.reference.v3 module
|
|||
:undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
.. automodule:: monasca_log_api.reference.v3.common.bulk_processor
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ from oslo_log import log
|
|||
|
||||
from monasca_log_api.monitoring import client
|
||||
from monasca_log_api.monitoring import metrics
|
||||
from monasca_log_api.reference.common import model
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
@ -55,9 +56,6 @@ log_publisher_group = cfg.OptGroup(name='log_publisher', title='log_publisher')
|
|||
cfg.CONF.register_group(log_publisher_group)
|
||||
cfg.CONF.register_opts(log_publisher_opts, log_publisher_group)
|
||||
|
||||
ENVELOPE_SCHEMA = ['log', 'meta', 'creation_time']
|
||||
"""Log envelope (i.e.) message keys"""
|
||||
|
||||
|
||||
class InvalidMessageException(Exception):
|
||||
pass
|
||||
|
@ -83,7 +81,10 @@ class LogPublisher(object):
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
|
||||
self._topics = CONF.log_publisher.topics
|
||||
self.max_message_size = CONF.log_publisher.max_message_size
|
||||
|
||||
self._kafka_publisher = producer.KafkaProducer(
|
||||
url=CONF.log_publisher.kafka_url
|
||||
)
|
||||
|
@ -106,87 +107,6 @@ class LogPublisher(object):
|
|||
|
||||
LOG.info('Initializing LogPublisher <%s>', self)
|
||||
|
||||
@staticmethod
|
||||
def _is_message_valid(message):
|
||||
"""Validates message before sending.
|
||||
|
||||
Methods checks if message is :py:class:`dict`.
|
||||
If so dictionary is verified against having following keys:
|
||||
|
||||
* meta
|
||||
* log
|
||||
* creation_time
|
||||
|
||||
If keys are found, each key must have a valueH.
|
||||
|
||||
If at least none of the conditions is met
|
||||
:py:class:`.InvalidMessageException` is raised
|
||||
|
||||
:raises InvalidMessageException: if message does not comply to schema
|
||||
|
||||
"""
|
||||
if not isinstance(message, dict):
|
||||
return False
|
||||
|
||||
for key in ENVELOPE_SCHEMA:
|
||||
if not (key in message and message.get(key)):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _truncate(self, envelope):
|
||||
"""Truncates the message if needed.
|
||||
|
||||
Each message send to kafka is verified.
|
||||
Method checks if message serialized to json
|
||||
exceeds maximum allowed size that can be posted to kafka
|
||||
queue. If so, method truncates message property of the log
|
||||
by difference between message and allowed size.
|
||||
|
||||
:param Envelope envelope: envelope to check
|
||||
:return: truncated message if size is exceeded, otherwise message
|
||||
is left unmodified
|
||||
"""
|
||||
|
||||
msg_str = rest_utils.as_json(envelope)
|
||||
|
||||
max_size = CONF.log_publisher.max_message_size
|
||||
envelope_size = ((len(bytearray(msg_str)) +
|
||||
_TIMESTAMP_KEY_SIZE +
|
||||
_KAFKA_META_DATA_SIZE)
|
||||
if msg_str is not None else -1)
|
||||
|
||||
size_diff = (envelope_size - max_size) + _TRUNCATION_SAFE_OFFSET
|
||||
|
||||
LOG.debug('_truncate(max_message_size=%d, message_size=%d, diff=%d)',
|
||||
max_size, envelope_size, size_diff)
|
||||
|
||||
if size_diff > 1:
|
||||
truncate_by = size_diff + _TRUNCATED_PROPERTY_SIZE
|
||||
|
||||
self._logs_truncated_gauge.send(
|
||||
name=None,
|
||||
value=truncate_by
|
||||
)
|
||||
|
||||
LOG.warn(('Detected message that exceeds %d bytes,'
|
||||
'message will be truncated by %d bytes'),
|
||||
max_size,
|
||||
truncate_by)
|
||||
|
||||
log_msg = envelope['log']['message']
|
||||
truncated_log_msg = log_msg[:-truncate_by]
|
||||
|
||||
envelope['log']['truncated'] = True
|
||||
envelope['log']['message'] = truncated_log_msg
|
||||
|
||||
# will just transform message once again without truncation
|
||||
return rest_utils.as_json(envelope)
|
||||
|
||||
self._logs_truncated_gauge.send(name=None, value=0)
|
||||
|
||||
return msg_str
|
||||
|
||||
def send_message(self, messages):
|
||||
"""Sends message to each configured topic.
|
||||
|
||||
|
@ -194,8 +114,8 @@ class LogPublisher(object):
|
|||
Falsy messages (i.e. empty) are not shipped to kafka
|
||||
|
||||
See also
|
||||
:py:class:`monasca_log_api.common.model.Envelope'
|
||||
:py:meth:`._is_message_valid'
|
||||
* :py:class:`monasca_log_api.common.model.Envelope`
|
||||
* :py:meth:`._is_message_valid`
|
||||
|
||||
:param dict|list messages: instance (or instances) of log envelope
|
||||
"""
|
||||
|
@ -206,18 +126,16 @@ class LogPublisher(object):
|
|||
messages = [messages]
|
||||
|
||||
sent_counter = 0
|
||||
to_sent_counter = len(messages)
|
||||
num_of_msgs = len(messages)
|
||||
|
||||
LOG.debug('About to publish %d messages to %s topics',
|
||||
to_sent_counter, self._topics)
|
||||
num_of_msgs, self._topics)
|
||||
|
||||
try:
|
||||
send_messages = []
|
||||
for message in messages:
|
||||
if not self._is_message_valid(message):
|
||||
raise InvalidMessageException()
|
||||
|
||||
msg = self._truncate(message)
|
||||
for message in messages:
|
||||
msg = self._transform_message(message)
|
||||
send_messages.append(msg)
|
||||
|
||||
with self._publish_time_ms.time(name=None):
|
||||
|
@ -229,24 +147,80 @@ class LogPublisher(object):
|
|||
LOG.exception(ex)
|
||||
raise ex
|
||||
finally:
|
||||
self._logs_published_counter.increment(value=sent_counter)
|
||||
if sent_counter == to_sent_counter:
|
||||
LOG.info('Successfully published all [%d] messages',
|
||||
sent_counter)
|
||||
else:
|
||||
failed_to_send = to_sent_counter - sent_counter
|
||||
error_str = ('Failed to sent all messages, %d '
|
||||
'messages out of %d have not been published')
|
||||
LOG.error(error_str, failed_to_send, to_sent_counter)
|
||||
self._after_publish(sent_counter, num_of_msgs)
|
||||
|
||||
self._logs_lost_counter.increment(
|
||||
value=failed_to_send
|
||||
)
|
||||
def _transform_message(self, message):
|
||||
"""Transforms message into JSON.
|
||||
|
||||
Method executes transformation operation for
|
||||
single element. Operation is set of following
|
||||
operations:
|
||||
|
||||
* checking if message is valid
|
||||
(:py:func:`.LogPublisher._is_message_valid`)
|
||||
* truncating message if necessary
|
||||
(:py:func:`.LogPublisher._truncate`)
|
||||
|
||||
:param model.Envelope message: instance of message
|
||||
:return: serialized message
|
||||
:rtype: str
|
||||
"""
|
||||
if not self._is_message_valid(message):
|
||||
raise InvalidMessageException()
|
||||
return self._truncate(message)
|
||||
|
||||
def _truncate(self, envelope):
|
||||
"""Truncates the message if needed.
|
||||
|
||||
Each message send to kafka is verified.
|
||||
Method checks if message serialized to json
|
||||
exceeds maximum allowed size that can be posted to kafka
|
||||
queue. If so, method truncates message property of the log
|
||||
by difference between message and allowed size.
|
||||
|
||||
:param Envelope envelope: original envelope
|
||||
:return: serialized message
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
msg_str = rest_utils.as_json(envelope)
|
||||
envelope_size = ((len(bytearray(msg_str)) +
|
||||
_TIMESTAMP_KEY_SIZE +
|
||||
_KAFKA_META_DATA_SIZE)
|
||||
if msg_str is not None else -1)
|
||||
|
||||
diff_size = ((envelope_size - self.max_message_size) +
|
||||
_TRUNCATION_SAFE_OFFSET)
|
||||
|
||||
if diff_size > 1:
|
||||
truncated_by = diff_size + _TRUNCATED_PROPERTY_SIZE
|
||||
|
||||
LOG.warn(('Detected message that exceeds %d bytes,'
|
||||
'message will be truncated by %d bytes'),
|
||||
self.max_message_size,
|
||||
truncated_by)
|
||||
|
||||
log_msg = envelope['log']['message']
|
||||
truncated_log_msg = log_msg[:-truncated_by]
|
||||
|
||||
envelope['log']['truncated'] = True
|
||||
envelope['log']['message'] = truncated_log_msg
|
||||
self._logs_truncated_gauge.send(name=None, value=truncated_by)
|
||||
|
||||
msg_str = rest_utils.as_json(envelope)
|
||||
else:
|
||||
self._logs_truncated_gauge.send(name=None, value=0)
|
||||
|
||||
return msg_str
|
||||
|
||||
def _publish(self, messages):
|
||||
to_sent = len(messages)
|
||||
"""Publishes messages to kafka.
|
||||
|
||||
LOG.debug('Publishing %d messages', to_sent)
|
||||
:param list messages: list of messages
|
||||
"""
|
||||
num_of_msg = len(messages)
|
||||
|
||||
LOG.debug('Publishing %d messages', num_of_msg)
|
||||
|
||||
try:
|
||||
for topic in self._topics:
|
||||
|
@ -254,7 +228,39 @@ class LogPublisher(object):
|
|||
topic,
|
||||
messages
|
||||
)
|
||||
LOG.debug('Sent %d messages to topic %s', to_sent, topic)
|
||||
LOG.debug('Sent %d messages to topic %s', num_of_msg, topic)
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||
ex.message, 60)
|
||||
|
||||
@staticmethod
|
||||
def _is_message_valid(message):
|
||||
"""Validates message before sending.
|
||||
|
||||
Methods checks if message is :py:class:`model.Envelope`.
|
||||
By being instance of this class it is ensured that all required
|
||||
keys are found and they will have their values.
|
||||
|
||||
"""
|
||||
return message and isinstance(message, model.Envelope)
|
||||
|
||||
def _after_publish(self, send_count, to_send_count):
|
||||
"""Executed after publishing to sent metrics.
|
||||
|
||||
:param int send_count: how many messages have been sent
|
||||
:param int to_send_count: how many messages should be sent
|
||||
|
||||
"""
|
||||
|
||||
failed_to_send = to_send_count - send_count
|
||||
|
||||
if failed_to_send == 0:
|
||||
LOG.debug('Successfully published all [%d] messages',
|
||||
send_count)
|
||||
else:
|
||||
error_str = ('Failed to send all messages, %d '
|
||||
'messages out of %d have not been published')
|
||||
LOG.error(error_str, failed_to_send, to_send_count)
|
||||
|
||||
self._logs_published_counter.increment(value=send_count)
|
||||
self._logs_lost_counter.increment(value=failed_to_send)
|
||||
|
|
|
@ -84,23 +84,8 @@ def validate_application_type(application_type=None):
|
|||
validate_match()
|
||||
|
||||
|
||||
def validate_dimensions(dimensions):
|
||||
"""Validates dimensions type.
|
||||
|
||||
Empty dimensions are not being validated.
|
||||
For details see:
|
||||
|
||||
:param dict dimensions: dimensions to validate
|
||||
|
||||
* :py:data:`DIMENSION_NAME_CONSTRAINTS`
|
||||
* :py:data:`DIMENSION_VALUE_CONSTRAINTS`
|
||||
"""
|
||||
|
||||
def validate_name(name):
|
||||
if not name:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimension name cannot be empty'
|
||||
)
|
||||
def _validate_dimension_name(name):
|
||||
try:
|
||||
if len(name) > DIMENSION_NAME_CONSTRAINTS['MAX_LENGTH']:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimension name %s must be 255 characters or less' %
|
||||
|
@ -116,26 +101,42 @@ def validate_dimensions(dimensions):
|
|||
'Dimension name %s may not contain: %s' %
|
||||
(name, '> < = { } ( ) \' " , ; &')
|
||||
)
|
||||
except (TypeError, IndexError):
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimension name cannot be empty'
|
||||
)
|
||||
|
||||
def validate_value(value):
|
||||
if not value:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimension value cannot be empty'
|
||||
)
|
||||
|
||||
def _validate_dimension_value(value):
|
||||
try:
|
||||
value[0]
|
||||
if len(value) > DIMENSION_VALUE_CONSTRAINTS['MAX_LENGTH']:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimension value %s must be 255 characters or less' %
|
||||
value
|
||||
)
|
||||
except (TypeError, IndexError):
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimension value cannot be empty'
|
||||
)
|
||||
|
||||
if (isinstance(dimensions, dict) and not
|
||||
isinstance(dimensions, basestring)):
|
||||
|
||||
for dim_name in dimensions:
|
||||
validate_name(dim_name)
|
||||
validate_value(dimensions[dim_name])
|
||||
def validate_dimensions(dimensions):
|
||||
"""Validates dimensions type.
|
||||
|
||||
else:
|
||||
Empty dimensions are not being validated.
|
||||
For details see:
|
||||
|
||||
:param dict dimensions: dimensions to validate
|
||||
|
||||
* :py:data:`DIMENSION_NAME_CONSTRAINTS`
|
||||
* :py:data:`DIMENSION_VALUE_CONSTRAINTS`
|
||||
"""
|
||||
try:
|
||||
for dim_name, dim_value in dimensions.iteritems():
|
||||
_validate_dimension_name(dim_name)
|
||||
_validate_dimension_value(dim_value)
|
||||
except AttributeError:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimensions %s must be a dictionary (map)' % dimensions)
|
||||
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_log_api.reference.common import log_publisher
|
||||
from monasca_log_api.reference.common import model
|
||||
from monasca_log_api.reference.common import validation
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class BulkProcessor(log_publisher.LogPublisher):
|
||||
"""BulkProcessor for effective log processing and publishing.
|
||||
|
||||
BulkProcessor is customized version of
|
||||
:py:class:`monasca_log_api.reference.common.log_publisher.LogPublisher`
|
||||
that utilizes processing of bulk request inside single loop.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, logs_in_counter, logs_rejected_counter):
|
||||
"""Initializes BulkProcessor.
|
||||
|
||||
:param logs_in_counter: V3 received logs counter
|
||||
:param logs_rejected_counter: V3 rejected logs counter
|
||||
"""
|
||||
super(BulkProcessor, self).__init__()
|
||||
|
||||
assert logs_in_counter is not None
|
||||
assert logs_rejected_counter is not None
|
||||
|
||||
self._logs_in_counter = logs_in_counter
|
||||
self._logs_rejected_counter = logs_rejected_counter
|
||||
|
||||
self.service_region = CONF.service.region
|
||||
|
||||
def send_message(self, logs, global_dimensions=None, log_tenant_id=None):
|
||||
"""Sends bulk package to kafka
|
||||
|
||||
:param list logs: received logs
|
||||
:param dict global_dimensions: global dimensions for each log
|
||||
:param str log_tenant_id: tenant who sent logs
|
||||
"""
|
||||
|
||||
num_of_msgs = len(logs) if logs else 0
|
||||
sent_count = 0
|
||||
to_send_msgs = []
|
||||
|
||||
LOG.debug('Bulk package <logs=%d, dimensions=%s, tenant_id=%s>',
|
||||
num_of_msgs, global_dimensions, log_tenant_id)
|
||||
|
||||
try:
|
||||
for log_el in logs:
|
||||
t_el = self._transform_message(log_el,
|
||||
global_dimensions,
|
||||
log_tenant_id)
|
||||
if t_el:
|
||||
to_send_msgs.append(t_el)
|
||||
|
||||
with self._publish_time_ms.time(name=None):
|
||||
self._publish(to_send_msgs)
|
||||
sent_count = len(to_send_msgs)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.error('Failed to send bulk package <logs=%d, dimensions=%s>',
|
||||
num_of_msgs, global_dimensions)
|
||||
LOG.exception(ex)
|
||||
raise ex
|
||||
finally:
|
||||
self._update_counters(len(to_send_msgs), num_of_msgs)
|
||||
self._after_publish(sent_count, len(to_send_msgs))
|
||||
|
||||
def _update_counters(self, in_counter, to_send_counter):
|
||||
rejected_counter = to_send_counter - in_counter
|
||||
|
||||
self._logs_in_counter.increment(value=in_counter)
|
||||
self._logs_rejected_counter.increment(value=rejected_counter)
|
||||
|
||||
def _transform_message(self, log_element, *args):
|
||||
try:
|
||||
validation.validate_log_message(log_element)
|
||||
|
||||
log_envelope = model.Envelope.new_envelope(
|
||||
log=log_element,
|
||||
tenant_id=args[1],
|
||||
region=self.service_region,
|
||||
dimensions=self._get_dimensions(log_element,
|
||||
global_dims=args[0])
|
||||
)
|
||||
|
||||
msg_payload = (super(BulkProcessor, self)
|
||||
._transform_message(log_envelope))
|
||||
|
||||
return msg_payload
|
||||
except Exception as ex:
|
||||
LOG.error('Log transformation failed, rejecting log')
|
||||
LOG.exception(ex)
|
||||
|
||||
return None
|
||||
|
||||
def _create_envelope(self, log_element, tenant_id, dimensions=None):
|
||||
"""Create a log envelope.
|
||||
|
||||
:param dict log_element: raw log element
|
||||
:param str tenant_id: tenant who sent logs
|
||||
:param dict dimensions: log dimensions
|
||||
:return: log envelope
|
||||
:rtype: model.Envelope
|
||||
|
||||
"""
|
||||
return
|
||||
|
||||
def _get_dimensions(self, log_element, global_dims=None):
|
||||
"""Get the dimensions of log element.
|
||||
|
||||
If global dimensions are specified and passed to this method,
|
||||
both instances are merged with each other.
|
||||
|
||||
If neither is specified empty dictionary is returned.
|
||||
|
||||
If only local dimensions are specified they are returned without any
|
||||
additional operations. The last statement applies also
|
||||
to global dimensions.
|
||||
|
||||
:param dict log_element: raw log instance
|
||||
:param dict global_dims: global dimensions or None
|
||||
:return: local dimensions merged with global dimensions
|
||||
:rtype: dict
|
||||
"""
|
||||
local_dims = log_element.get('dimensions', {})
|
||||
|
||||
if not global_dims:
|
||||
global_dims = {}
|
||||
if local_dims:
|
||||
validation.validate_dimensions(local_dims)
|
||||
|
||||
dimensions = global_dims.copy()
|
||||
dimensions.update(local_dims)
|
||||
|
||||
return dimensions
|
|
@ -19,6 +19,9 @@ import falcon
|
|||
from monasca_common.rest import utils as rest_utils
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_log_api.api import exceptions
|
||||
from monasca_log_api.reference.common import validation
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -37,3 +40,18 @@ def read_json_msg_body(req):
|
|||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request',
|
||||
'Request body is not valid JSON')
|
||||
|
||||
|
||||
def get_global_dimensions(request_body):
|
||||
"""Get the top level dimensions in the HTTP request body."""
|
||||
global_dims = request_body.get('dimensions', {})
|
||||
validation.validate_dimensions(global_dims)
|
||||
return global_dims
|
||||
|
||||
|
||||
def get_logs(request_body):
|
||||
"""Get the logs in the HTTP request body."""
|
||||
if 'logs' not in request_body:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Unprocessable Entity Logs not found')
|
||||
return request_body['logs']
|
||||
|
|
|
@ -21,9 +21,8 @@ from monasca_log_api.api import exceptions
|
|||
from monasca_log_api.api import headers
|
||||
from monasca_log_api.api import logs_api
|
||||
from monasca_log_api.monitoring import metrics
|
||||
from monasca_log_api.reference.common import log_publisher
|
||||
from monasca_log_api.reference.common import model
|
||||
from monasca_log_api.reference.common import validation
|
||||
from monasca_log_api.reference.v3.common import bulk_processor
|
||||
from monasca_log_api.reference.v3.common import helpers
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -37,8 +36,11 @@ class Logs(logs_api.LogsApi):
|
|||
|
||||
def __init__(self):
|
||||
super(Logs, self).__init__()
|
||||
self._log_publisher = log_publisher.LogPublisher()
|
||||
|
||||
self._processor = bulk_processor.BulkProcessor(
|
||||
logs_in_counter=self._logs_in_counter,
|
||||
logs_rejected_counter=self._logs_rejected_counter
|
||||
)
|
||||
self._bulks_rejected_counter = self._statsd.get_counter(
|
||||
name=metrics.LOGS_BULKS_REJECTED_METRIC,
|
||||
dimensions=self._metrics_dimensions
|
||||
|
@ -77,91 +79,29 @@ class Logs(logs_api.LogsApi):
|
|||
self._logs_size_gauge.send(name=None,
|
||||
value=int(req.content_length))
|
||||
|
||||
envelopes = []
|
||||
try:
|
||||
for log_element in log_list:
|
||||
LOG.trace('Processing log %s', log_element)
|
||||
|
||||
validation.validate_log_message(log_element)
|
||||
|
||||
dimensions = self._get_dimensions(log_element,
|
||||
global_dimensions)
|
||||
envelope = self._create_log_envelope(tenant_id,
|
||||
cross_tenant_id,
|
||||
dimensions,
|
||||
log_element)
|
||||
envelopes.append(envelope)
|
||||
|
||||
LOG.trace('Log %s processed into envelope %s',
|
||||
log_element,
|
||||
envelope)
|
||||
self._processor.send_message(
|
||||
logs=log_list,
|
||||
global_dimensions=global_dimensions,
|
||||
log_tenant_id=tenant_id if tenant_id else cross_tenant_id
|
||||
)
|
||||
except Exception as ex:
|
||||
LOG.error('Failed to process log %s', log_element)
|
||||
LOG.exception(ex)
|
||||
res.status = getattr(ex, 'status', falcon.HTTP_500)
|
||||
return
|
||||
finally:
|
||||
rejected_logs = len(envelopes) - len(log_list)
|
||||
# if entire bulk is rejected because of single error
|
||||
# that means only one counter must be called
|
||||
if rejected_logs < 0:
|
||||
self._logs_rejected_counter.increment(value=len(log_list))
|
||||
else:
|
||||
self._logs_in_counter.increment(value=len(log_list))
|
||||
|
||||
# at this point only possible metrics regard
|
||||
# publishing phase
|
||||
self._send_logs(envelopes)
|
||||
res.status = falcon.HTTP_204
|
||||
|
||||
def _get_dimensions(self, log_element, global_dims):
|
||||
"""Get the dimensions in the log element."""
|
||||
local_dims = log_element.get('dimensions', {})
|
||||
if local_dims:
|
||||
validation.validate_dimensions(local_dims)
|
||||
if global_dims:
|
||||
dimensions = global_dims.copy()
|
||||
dimensions.update(local_dims)
|
||||
else:
|
||||
dimensions = local_dims
|
||||
else:
|
||||
dimensions = global_dims
|
||||
|
||||
return dimensions
|
||||
|
||||
def _get_global_dimensions(self, request_body):
|
||||
@staticmethod
|
||||
def _get_global_dimensions(request_body):
|
||||
"""Get the top level dimensions in the HTTP request body."""
|
||||
global_dims = request_body.get('dimensions', {})
|
||||
validation.validate_dimensions(global_dims)
|
||||
return global_dims
|
||||
|
||||
def _get_logs(self, request_body):
|
||||
@staticmethod
|
||||
def _get_logs(request_body):
|
||||
"""Get the logs in the HTTP request body."""
|
||||
if 'logs' not in request_body:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Unprocessable Entity Logs not found')
|
||||
return request_body['logs']
|
||||
|
||||
def _create_log_envelope(self,
|
||||
tenant_id,
|
||||
cross_tenant_id,
|
||||
dimensions=None,
|
||||
log_element=None):
|
||||
"""Create a log envelope and return it as a json string."""
|
||||
|
||||
envelope = model.Envelope.new_envelope(
|
||||
log=log_element,
|
||||
tenant_id=tenant_id if tenant_id else cross_tenant_id,
|
||||
region=CONF.service.region,
|
||||
dimensions=dimensions
|
||||
)
|
||||
|
||||
return envelope
|
||||
|
||||
def _send_logs(self, logs):
|
||||
"""Send the logs to Kafka."""
|
||||
try:
|
||||
self._log_publisher.send_message(logs)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise ex
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from falcon import testing
|
||||
|
||||
from monasca_log_api.tests import base
|
||||
|
||||
|
||||
class TestBulkProcessor(testing.TestBase):
|
||||
def setUp(self):
|
||||
self.conf = base.mock_config(self)
|
||||
return super(TestBulkProcessor, self).setUp()
|
|
@ -24,6 +24,7 @@ from falcon import testing
|
|||
import mock
|
||||
|
||||
from monasca_log_api.reference.common import log_publisher
|
||||
from monasca_log_api.reference.common import model
|
||||
from monasca_log_api.tests import base
|
||||
|
||||
EPOCH_START = datetime.datetime(1970, 1, 1)
|
||||
|
@ -122,8 +123,9 @@ class TestSendMessage(testing.TestBase):
|
|||
dimension_1_value = '50'
|
||||
dimension_2_name = 'cpu_time'
|
||||
dimension_2_value = '60'
|
||||
msg = {
|
||||
'log': {
|
||||
|
||||
msg = model.Envelope(
|
||||
log={
|
||||
'message': 1,
|
||||
'application_type': application_type,
|
||||
'dimensions': {
|
||||
|
@ -131,12 +133,11 @@ class TestSendMessage(testing.TestBase):
|
|||
dimension_2_name: dimension_2_value
|
||||
}
|
||||
},
|
||||
'creation_time': creation_time,
|
||||
'meta': {
|
||||
meta={
|
||||
'tenantId': 1
|
||||
}
|
||||
}
|
||||
|
||||
)
|
||||
msg['creation_time'] = creation_time
|
||||
instance.send_message(msg)
|
||||
|
||||
instance._kafka_publisher.publish.assert_called_once_with(
|
||||
|
@ -162,8 +163,8 @@ class TestSendMessage(testing.TestBase):
|
|||
dimension_2_name = 'cpu_time'
|
||||
dimension_2_value = '60'
|
||||
application_type = 'monasca-log-api'
|
||||
msg = {
|
||||
'log': {
|
||||
msg = model.Envelope(
|
||||
log={
|
||||
'message': 1,
|
||||
'application_type': application_type,
|
||||
'dimensions': {
|
||||
|
@ -171,11 +172,11 @@ class TestSendMessage(testing.TestBase):
|
|||
dimension_2_name: dimension_2_value
|
||||
}
|
||||
},
|
||||
'creation_time': creation_time,
|
||||
'meta': {
|
||||
meta={
|
||||
'tenantId': 1
|
||||
}
|
||||
}
|
||||
)
|
||||
msg['creation_time'] = creation_time
|
||||
json_msg = ujson.dumps(msg)
|
||||
|
||||
instance.send_message(msg)
|
||||
|
|
|
@ -66,25 +66,28 @@ def _generate_v3_payload(log_count):
|
|||
|
||||
|
||||
class TestLogsVersion(unittest.TestCase):
|
||||
@mock.patch('monasca_log_api.reference.v3.logs.log_publisher'
|
||||
'.LogPublisher')
|
||||
|
||||
@mock.patch('monasca_log_api.reference.v3.common.'
|
||||
'bulk_processor.BulkProcessor')
|
||||
def test_should_return_v3_as_version(self, _):
|
||||
logs_resource = logs.Logs()
|
||||
self.assertEqual('v3.0', logs_resource.version)
|
||||
|
||||
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.producer.'
|
||||
'KafkaProducer')
|
||||
@mock.patch('monasca_log_api.monitoring.client.monascastatsd.Connection')
|
||||
class TestLogsMonitoring(testing.TestBase):
|
||||
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher')
|
||||
def test_monitor_bulk_rejected(self, _):
|
||||
resource = _init_resource(self)
|
||||
def test_monitor_bulk_rejected(self, __, _):
|
||||
res = _init_resource(self)
|
||||
|
||||
resource._logs_in_counter = in_counter = mock.Mock()
|
||||
resource._logs_rejected_counter = rejected_counter = mock.Mock()
|
||||
resource._bulks_rejected_counter = bulk_counter = mock.Mock()
|
||||
resource._logs_size_gauge = size_gauge = mock.Mock()
|
||||
in_counter = res._logs_in_counter.increment = mock.Mock()
|
||||
bulk_counter = res._bulks_rejected_counter.increment = mock.Mock()
|
||||
rejected_counter = res._logs_rejected_counter.increment = mock.Mock()
|
||||
size_gauge = res._logs_size_gauge.send = mock.Mock()
|
||||
|
||||
resource._get_logs = mock.Mock(
|
||||
res._get_logs = mock.Mock(
|
||||
side_effect=log_api_exceptions.HTTPUnprocessableEntity(''))
|
||||
|
||||
log_count = 1
|
||||
|
@ -104,19 +107,18 @@ class TestLogsMonitoring(testing.TestBase):
|
|||
body=payload
|
||||
)
|
||||
|
||||
self.assertEqual(1, bulk_counter.increment.call_count)
|
||||
self.assertEqual(0, in_counter.increment.call_count)
|
||||
self.assertEqual(0, rejected_counter.increment.call_count)
|
||||
self.assertEqual(0, size_gauge.send.call_count)
|
||||
self.assertEqual(1, bulk_counter.call_count)
|
||||
self.assertEqual(0, in_counter.call_count)
|
||||
self.assertEqual(0, rejected_counter.call_count)
|
||||
self.assertEqual(0, size_gauge.call_count)
|
||||
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher')
|
||||
def test_monitor_not_all_logs_ok(self, _):
|
||||
resource = _init_resource(self)
|
||||
def test_monitor_not_all_logs_ok(self, __, _):
|
||||
res = _init_resource(self)
|
||||
|
||||
resource._logs_in_counter = in_counter = mock.Mock()
|
||||
resource._logs_rejected_counter = rejected_counter = mock.Mock()
|
||||
resource._bulks_rejected_counter = bulk_counter = mock.Mock()
|
||||
resource._logs_size_gauge = size_gauge = mock.Mock()
|
||||
in_counter = res._logs_in_counter.increment = mock.Mock()
|
||||
bulk_counter = res._bulks_rejected_counter.increment = mock.Mock()
|
||||
rejected_counter = res._logs_rejected_counter.increment = mock.Mock()
|
||||
size_gauge = res._logs_size_gauge.send = mock.Mock()
|
||||
|
||||
log_count = 5
|
||||
reject_logs = 1
|
||||
|
@ -124,10 +126,10 @@ class TestLogsMonitoring(testing.TestBase):
|
|||
payload = json.dumps(v3_body)
|
||||
content_length = len(payload)
|
||||
|
||||
side_effects = [{} for __ in xrange(log_count - reject_logs)]
|
||||
side_effects = [{} for ___ in xrange(log_count - reject_logs)]
|
||||
side_effects.append(log_api_exceptions.HTTPUnprocessableEntity(''))
|
||||
|
||||
resource._get_dimensions = mock.Mock(side_effect=side_effects)
|
||||
res._processor._get_dimensions = mock.Mock(side_effect=side_effects)
|
||||
|
||||
self.simulate_request(
|
||||
ENDPOINT,
|
||||
|
@ -141,30 +143,31 @@ class TestLogsMonitoring(testing.TestBase):
|
|||
body=payload
|
||||
)
|
||||
|
||||
self.assertEqual(1, bulk_counter.increment.call_count)
|
||||
self.assertEqual(1, bulk_counter.call_count)
|
||||
self.assertEqual(0,
|
||||
bulk_counter.increment.mock_calls[0][2]['value'])
|
||||
bulk_counter.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(0, in_counter.increment.call_count)
|
||||
self.assertEqual(1, in_counter.call_count)
|
||||
self.assertEqual(log_count - reject_logs,
|
||||
in_counter.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(1, rejected_counter.increment.call_count)
|
||||
self.assertEqual(log_count,
|
||||
rejected_counter.increment.mock_calls[0][2]['value'])
|
||||
self.assertEqual(1, rejected_counter.call_count)
|
||||
self.assertEqual(reject_logs,
|
||||
rejected_counter.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(1, size_gauge.send.call_count)
|
||||
self.assertEqual(1, size_gauge.call_count)
|
||||
self.assertEqual(content_length,
|
||||
size_gauge.send.mock_calls[0][2]['value'])
|
||||
size_gauge.mock_calls[0][2]['value'])
|
||||
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher')
|
||||
def test_monitor_all_logs_ok(self, _):
|
||||
resource = _init_resource(self)
|
||||
def test_monitor_all_logs_ok(self, __, _):
|
||||
res = _init_resource(self)
|
||||
|
||||
resource._logs_in_counter = in_counter = mock.Mock()
|
||||
resource._logs_rejected_counter = rejected_counter = mock.Mock()
|
||||
resource._bulks_rejected_counter = bulk_counter = mock.Mock()
|
||||
resource._logs_size_gauge = size_gauge = mock.Mock()
|
||||
in_counter = res._logs_in_counter.increment = mock.Mock()
|
||||
bulk_counter = res._bulks_rejected_counter.increment = mock.Mock()
|
||||
rejected_counter = res._logs_rejected_counter.increment = mock.Mock()
|
||||
size_gauge = res._logs_size_gauge.send = mock.Mock()
|
||||
|
||||
resource._send_logs = mock.Mock()
|
||||
res._send_logs = mock.Mock()
|
||||
|
||||
log_count = 10
|
||||
|
||||
|
@ -184,16 +187,18 @@ class TestLogsMonitoring(testing.TestBase):
|
|||
body=payload
|
||||
)
|
||||
|
||||
self.assertEqual(1, bulk_counter.increment.call_count)
|
||||
self.assertEqual(1, bulk_counter.call_count)
|
||||
self.assertEqual(0,
|
||||
bulk_counter.increment.mock_calls[0][2]['value'])
|
||||
bulk_counter.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(1, in_counter.increment.call_count)
|
||||
self.assertEqual(1, in_counter.call_count)
|
||||
self.assertEqual(log_count,
|
||||
in_counter.increment.mock_calls[0][2]['value'])
|
||||
in_counter.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(0, rejected_counter.increment.call_count)
|
||||
self.assertEqual(1, rejected_counter.call_count)
|
||||
self.assertEqual(0,
|
||||
rejected_counter.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(1, size_gauge.send.call_count)
|
||||
self.assertEqual(1, size_gauge.call_count)
|
||||
self.assertEqual(content_length,
|
||||
size_gauge.send.mock_calls[0][2]['value'])
|
||||
size_gauge.mock_calls[0][2]['value'])
|
||||
|
|
|
@ -21,12 +21,13 @@ from monasca_log_api.api import logs_api
|
|||
from monasca_log_api.reference.v2 import logs as v2_logs
|
||||
from monasca_log_api.reference.v3 import logs as v3_logs
|
||||
|
||||
from monasca_log_api.reference.common import model
|
||||
|
||||
|
||||
class SameV2V3Output(testing.TestBase):
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher')
|
||||
def test_send_identical_messages(self, publisher):
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
@mock.patch('monasca_log_api.reference.common.'
|
||||
'log_publisher.producer.KafkaProducer')
|
||||
def test_send_identical_messages(self, _):
|
||||
# mocks only log publisher, so the last component that actually
|
||||
# sends data to kafka
|
||||
# case is to verify if publisher was called with same arguments
|
||||
|
@ -35,8 +36,10 @@ class SameV2V3Output(testing.TestBase):
|
|||
v2 = v2_logs.Logs()
|
||||
v3 = v3_logs.Logs()
|
||||
|
||||
v2._kafka_publisher = publisher
|
||||
v3._log_publisher = publisher
|
||||
publish_mock = mock.Mock()
|
||||
|
||||
v2._kafka_publisher._kafka_publisher.publish = publish_mock
|
||||
v3._processor._kafka_publisher.publish = publish_mock
|
||||
|
||||
component = 'monasca-log-api'
|
||||
service = 'laas'
|
||||
|
@ -92,17 +95,24 @@ class SameV2V3Output(testing.TestBase):
|
|||
body=json.dumps(v3_body)
|
||||
)
|
||||
|
||||
self.assertEqual(2, publisher.call_count)
|
||||
self.assertEqual(2, publish_mock.call_count)
|
||||
|
||||
# in v2 send_messages is called with single envelope
|
||||
v2_send_msg_arg = publisher.method_calls[0][1][0]
|
||||
v2_send_msg_arg = publish_mock.mock_calls[0][1][1]
|
||||
|
||||
# in v3 it is always called with list of envelopes
|
||||
v3_send_msg_arg = publisher.method_calls[1][1][0][0]
|
||||
v3_send_msg_arg = publish_mock.mock_calls[1][1][1]
|
||||
|
||||
self.maxDiff = None
|
||||
|
||||
# at this point we know that both args should be identical
|
||||
self.assertEqual(type(v2_send_msg_arg), type(v3_send_msg_arg))
|
||||
self.assertIsInstance(v3_send_msg_arg, model.Envelope)
|
||||
self.assertDictEqual(v2_send_msg_arg, v3_send_msg_arg)
|
||||
self.assertIsInstance(v3_send_msg_arg, list)
|
||||
|
||||
self.assertEqual(len(v2_send_msg_arg), len(v3_send_msg_arg))
|
||||
self.assertEqual(1, len(v2_send_msg_arg))
|
||||
|
||||
v2_msg_as_dict = json.loads(v2_send_msg_arg[0])
|
||||
v3_msg_as_dict = json.loads(v3_send_msg_arg[0])
|
||||
|
||||
self.assertDictEqual(v2_msg_as_dict, v3_msg_as_dict)
|
||||
|
|
Loading…
Reference in New Issue