Truncating too big message

If message that is about to be sent to kafka exceeds maximum allowed size
(i.e. log_publisher.max_message_size), log's message being part of
it is truncated by the difference between maximum and current size.
With this message can be still sent to kafka instead of being
dropped.

Change-Id: Ide1d369a0f58efb3a56b22c47118d2aa062fdc09
This commit is contained in:
Tomasz Trębski 2016-05-23 14:11:58 +02:00
parent e9ca59c1c1
commit 264ddd3c3f
8 changed files with 229 additions and 106 deletions

View File

@ -68,6 +68,31 @@ in given case. However two mentioned in this documentation are required.
All fields, apart from **creation_time** and **log**, are created from HTTP headers.
Description is available [here](/docs/monasca-log-api-spec.md).
## Truncating too large message
Following section mostly applies to monasca-log-api v3.0
Each *envelope* sent to Kafka is serialized into JSON string. This string must
comply to Kafka limitation about [maximum message size](https://kafka.apache.org/08/configuration.html).
If JSON message is too big following actions are taken
1) difference between maximum allowed size and JSON message size (both in bytes).
```diff = (size(json_envelope) + size(envelope_key) + KAFKA_METADATA_SIZE) - maximum_allowed_size + TRUNCATION_SAFE_OFFSET```.
**KAFKA_METADATA_SIZE** is amount of bytes Kafka adds during transformation
of each message prior to sending it
2) log is enriched with property **truncated** set to **true** (```log['truncated'] = True```)
3) log's message is truncated by ```diff + TRUNCATED_PROPERTY_SIZE```.
**TRUNCATED_PROPERTY_SIZE** is the size of newly added property.
Variables explanation:
* **envelope_key** is the key used when routing logs into specific kafka partitions.
Its byte size is always fixed (determined from the byte size of timestamp represented as string).
```len(bytearray(str(int(time.time() * 1000)).encode('utf-8')))```
* **KAFKA_METADATA_SIZE** equals to 200 bytes.
* **TRUNCATION_SAFE_OFFSET** is equal to 1 ensuring that diff size will be always positive number
* **TRUNCATED_PROPERTY_SIZE** is calculated as byte size of expression ```log['truncated'] = True```
for each run of log-api.
## Configuration
### Java
@ -114,4 +139,4 @@ There are only two relevant options:
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# under the License.

View File

@ -1,8 +1,8 @@
# Monasca Log API
Date: April 18, 2016
Date: May 27, 2016
Document Version: v2.2.1
Document Version: v2.2.2
# Logs
The logs resource allows logs to be created.
@ -10,7 +10,6 @@ The logs resource allows logs to be created.
## Create Logs
Create logs.
### POST /v3.0/logs
#### Headers
@ -26,7 +25,7 @@ None.
#### Request Body
JSON object which can have a maximum size of 5 MB. It consists of global
dimensions (optional) and array of logs. Each single log message with
resulting Kafka envelope can have a maximum size of 1 MB.
resulting envelope can have a maximum size of 1 MB.
Dimensions is a dictionary of key-value pairs and should be consistent with
metric dimensions.
@ -208,4 +207,4 @@ This request does not return a response body.
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# under the License.

View File

@ -13,17 +13,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from monasca_common.kafka import producer
from monasca_common.rest import utils as rest_utils
from oslo_config import cfg
from oslo_log import log
from monasca_log_api.reference.common import validation
LOG = log.getLogger(__name__)
CONF = cfg.CONF
_MAX_MESSAGE_SIZE = 1048576
_RETRY_AFTER = 60
_TIMESTAMP_KEY_SIZE = len(
bytearray(str(int(time.time() * 1000)).encode('utf-8')))
_TRUNCATED_PROPERTY_SIZE = len(
bytearray('"truncated": true'.encode('utf-8')))
_KAFKA_META_DATA_SIZE = 32
_TRUNCATION_SAFE_OFFSET = 1
log_publisher_opts = [
cfg.StrOpt('kafka_url',
@ -106,6 +113,52 @@ class LogPublisher(object):
return True
def _truncate(self, envelope):
"""Truncates the message if needed.
Each message send to kafka is verified.
Method checks if message serialized to json
exceeds maximum allowed size that can be posted to kafka
queue. If so, method truncates message property of the log
by difference between message and allowed size.
:param Envelope envelope: envelope to check
:return: truncated message if size is exceeded, otherwise message
is left unmodified
"""
msg_str = rest_utils.as_json(envelope)
max_size = CONF.log_publisher.max_message_size
envelope_size = ((len(bytearray(msg_str)) +
_TIMESTAMP_KEY_SIZE +
_KAFKA_META_DATA_SIZE)
if msg_str is not None else -1)
size_diff = (envelope_size - max_size) + _TRUNCATION_SAFE_OFFSET
LOG.debug('_truncate(max_message_size=%d, message_size=%d, diff=%d)',
max_size, envelope_size, size_diff)
if size_diff > 1:
truncate_by = size_diff + _TRUNCATED_PROPERTY_SIZE
LOG.warn(('Detected message that exceeds %d bytes,'
'message will be truncated by %d bytes'),
max_size,
truncate_by)
log_msg = envelope['log']['message']
truncated_log_msg = log_msg[:-truncate_by]
envelope['log']['truncated'] = True
envelope['log']['message'] = truncated_log_msg
# will just transform message once again without truncation
return rest_utils.as_json(envelope)
return msg_str
def send_message(self, messages):
"""Sends message to each configured topic.
@ -136,12 +189,12 @@ class LogPublisher(object):
if not self._is_message_valid(message):
raise InvalidMessageException()
msg = rest_utils.as_json(message)
validation.validate_envelope_size(msg)
msg = self._truncate(message)
send_messages.append(msg)
for topic in self._topics:
self._kafka_publisher.publish(topic, send_messages)
sent_counter = to_sent_counter
except Exception as ex:
LOG.error('Failure in publishing messages to kafka')

View File

@ -13,7 +13,6 @@
# under the License.
import re
import sys
import falcon
from oslo_config import cfg
@ -213,33 +212,6 @@ def validate_payload_size(req):
)
def validate_envelope_size(envelope=None):
"""Validates envelope size before sending to kafka.
Validates the case similar to what
:py:meth:`.Validations.validate_payload_size`. Difference is
that this method checks if log envelope (already serialized)
can be safely sent to Kafka.
For more information check kafka documentation regarding
Message Size Too Large exception.
:param str envelope: serialized envelope
:exception: :py:class:`falcon.HTTPInternalServerError`
"""
max_size = CONF.log_publisher.max_message_size
envelope_size = sys.getsizeof(envelope) if envelope is not None else -1
LOG.debug('Envelope size is %s', envelope_size)
if envelope_size >= max_size:
error_msg = (envelope_size, max_size)
raise falcon.HTTPInternalServerError(
title='Kafka message size exceeded',
description='%d exceeds maximum allowed size %d bytes' % error_msg
)
def validate_is_delegate(role):
if role:
role = role.split(',')

View File

@ -13,7 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import random
import string
import ujson
import unittest
@ -26,6 +29,16 @@ from monasca_log_api.tests import base
EPOCH_START = datetime.datetime(1970, 1, 1)
def _generate_unique_message(size):
letters = string.ascii_lowercase
def rand(amount, space=True):
space = ' ' if space else ''
return ''.join((random.choice(letters + space) for _ in range(amount)))
return rand(size)
class TestSendMessage(testing.TestBase):
def setUp(self):
self.conf = base.mock_config(self)
@ -47,6 +60,30 @@ class TestSendMessage(testing.TestBase):
not_dict_value = 123
instance.send_message(not_dict_value)
@mock.patch('monasca_log_api.reference.common.log_publisher.producer'
'.KafkaProducer')
def test_should_not_send_message_missing_keys(self, _):
# checks every combination of missing keys
# test does not rely on those keys having a value or not,
# it simply assumes that values are set but important
# message (i.e. envelope) properties are missing entirely
# that's why there are two loops instead of three
instance = log_publisher.LogPublisher()
keys = ['log', 'creation_time', 'meta']
for key_1 in keys:
diff = keys[:]
diff.remove(key_1)
for key_2 in diff:
message = {
key_1: random.randint(10, 20),
key_2: random.randint(30, 50)
}
self.assertRaises(log_publisher.InvalidMessageException,
instance.send_message,
message)
@mock.patch('monasca_log_api.reference.common.log_publisher.producer'
'.KafkaProducer')
def test_should_not_send_message_missing_values(self, _):
@ -110,8 +147,9 @@ class TestSendMessage(testing.TestBase):
'.KafkaProducer')
def test_should_send_message_multiple_topics(self, _):
topics = ['logs', 'analyzer', 'tester']
self.conf.config(topics=topics, group='log_publisher')
self.conf.config(max_log_size=5000, group='service')
self.conf.config(topics=topics,
max_message_size=5000,
group='log_publisher')
instance = log_publisher.LogPublisher()
instance._kafka_publisher = mock.Mock()
@ -148,3 +186,92 @@ class TestSendMessage(testing.TestBase):
instance._kafka_publisher.publish.assert_any_call(
topic,
[json_msg])
class TestTruncation(testing.TestBase):
EXTRA_CHARS_SIZE = len(bytearray(ujson.dumps({
'log': {
'message': None
}
}))) - 2
def __init__(self, *args, **kwargs):
super(TestTruncation, self).__init__(*args, **kwargs)
self._conf = None
def setUp(self):
super(TestTruncation, self).setUp()
self._conf = base.mock_config(self)
@mock.patch(
'monasca_log_api.reference.common.log_publisher.producer'
'.KafkaProducer')
def test_should_not_truncate_message_if_size_is_smaller(self, _):
diff_size = random.randint(1, 100)
self._run_truncate_test(log_size_factor=-diff_size,
truncate_by=0)
@mock.patch(
'monasca_log_api.reference.common.log_publisher.producer'
'.KafkaProducer')
def test_should_not_truncate_message_if_size_equal_to_max(self, _):
self._run_truncate_test(log_size_factor=0,
truncate_by=0)
@mock.patch(
'monasca_log_api.reference.common.log_publisher.producer'
'.KafkaProducer')
def test_should_truncate_too_big_message(self, _):
diff_size = random.randint(1, 100)
max_size = 1000
truncate_by = ((max_size -
(max_size - log_publisher._TRUNCATED_PROPERTY_SIZE)) +
log_publisher._TRUNCATION_SAFE_OFFSET + diff_size)
self._run_truncate_test(max_message_size=1000,
log_size_factor=diff_size,
truncate_by=truncate_by)
def _run_truncate_test(self,
max_message_size=1000,
log_size_factor=0,
truncate_by=0,
gen_fn=_generate_unique_message):
log_size = (max_message_size -
TestTruncation.EXTRA_CHARS_SIZE -
log_publisher._KAFKA_META_DATA_SIZE -
log_publisher._TIMESTAMP_KEY_SIZE +
log_size_factor)
expected_log_message_size = log_size - truncate_by
self._conf.config(
group='log_publisher',
max_message_size=max_message_size
)
log_msg = gen_fn(log_size)
envelope = {
'log': {
'message': log_msg
}
}
instance = log_publisher.LogPublisher()
envelope_copy = copy.deepcopy(envelope)
json_envelope = instance._truncate(envelope_copy)
parsed_envelope = ujson.loads(json_envelope)
parsed_log_message = parsed_envelope['log']['message']
parsed_log_message_len = len(parsed_log_message)
if truncate_by > 0:
self.assertNotEqual(envelope['log']['message'],
parsed_log_message)
else:
self.assertEqual(envelope['log']['message'],
parsed_log_message)
self.assertEqual(expected_log_message_size, parsed_log_message_len)

View File

@ -14,8 +14,6 @@
# under the License.
import datetime
import random
import string
import unittest
from falcon import errors
@ -322,49 +320,6 @@ class PayloadSizeValidations(testing.TestBase):
)
class EnvelopeSizeValidations(testing.TestBase):
@staticmethod
def _rand_str(size):
return ''.join((random.choice(string.letters) for _ in range(size)))
def setUp(self):
self.conf = base.mock_config(self)
return super(EnvelopeSizeValidations, self).setUp()
def test_should_pass_envelope_size_ok(self):
envelope = self._rand_str(120)
max_message_size = 240
self.conf.config(max_message_size=max_message_size,
group='log_publisher')
validation.validate_envelope_size(envelope)
def test_should_pass_envelope_size_exceeded(self):
envelope = self._rand_str(360)
max_message_size = 240
self.conf.config(max_message_size=max_message_size,
group='log_publisher')
self.assertRaises(
errors.HTTPInternalServerError,
validation.validate_envelope_size,
envelope
)
def test_should_pass_envelope_size_equal(self):
envelope = self._rand_str(240)
max_message_size = 240
self.conf.config(max_message_size=max_message_size,
group='log_publisher')
self.assertRaises(
errors.HTTPInternalServerError,
validation.validate_envelope_size,
envelope
)
class LogMessageValidations(testing.TestBase):
def test_should_pass_message_in_log_property(self):

View File

@ -92,24 +92,3 @@ class TestLogApiConstraints(base.BaseLogsTestCase):
return
self.assertTrue(False, 'API should respond with 413')
@test.attr(type='gate')
def test_should_accept_message_but_reject_after_adding_metadata(self):
_, message = base.generate_unique_message(
size=base._get_message_size(0.99981))
headers = base._get_headers(self.logs_client.get_headers())
data = base._get_data(message)
try:
self.logs_client.send_single_log(data, headers)
except exceptions.ServerFault as urc:
self.assertEqual(500, urc.resp.status)
msg = urc.resp_body.get('title', None)
# in Java that is under message
if msg is None:
msg = urc.resp_body.get('message', None)
self.assertIsNotNone(msg, 'Should get status message')
self.assertEqual('Envelope size exceeded', msg)
return
self.assertTrue(False, 'API should respond with 500')

View File

@ -86,3 +86,16 @@ class TestSingleLog(base.BaseLogsTestCase):
response = self._run_and_wait(sid, message, headers=headers)
self.assertEqual('production', response[0]['_source']['environment'])
self.assertEqual('WebServer01', response[0]['_source']['server'])
# TODO(trebski) following test not passing - failed to retrieve
# big message from elasticsearch
# @test.attr(type='gate')
# def test_should_truncate_big_message(self):
# message_size = base._get_message_size(0.9999)
# sid, message = base.generate_unique_message(size=message_size)
#
# headers = base._get_headers(self.logs_client.get_headers())
# response = self._run_and_wait(sid, message, headers=headers)
#
# self.assertTrue(False, 'API should respond with 500')