Sync to never monasca-common
Using rest utils and new key-aware method from kafka producer. Change-Id: I92ca885237f12cc3790fa0935362f9e833b15287
This commit is contained in:
parent
16481d397c
commit
cb54d3e496
|
@ -1,76 +0,0 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import falcon
|
||||
import simplejson as json
|
||||
|
||||
ENCODING = 'utf8'
|
||||
|
||||
|
||||
def read_body(payload, content_type='application/json'):
|
||||
"""Reads HTTP payload according to given content_type.
|
||||
|
||||
Function is capable of reading from payload stream.
|
||||
Read data is then processed according to content_type.
|
||||
|
||||
Note:
|
||||
There is no transformation if content type is equal to
|
||||
'text/plain'. What has been read is returned.
|
||||
|
||||
:param payload(stream): payload to read
|
||||
:param content_type(str): payload content type
|
||||
:return: read data, returned type depends on content_type
|
||||
|
||||
:exception: :py:class:`falcon.HTTPBadRequest` - in case of any failure when
|
||||
reading data
|
||||
|
||||
"""
|
||||
try:
|
||||
content = payload.read()
|
||||
if not content:
|
||||
return False
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPBadRequest(title='Failed to read body',
|
||||
description=ex.message)
|
||||
|
||||
if content_type == 'application/json':
|
||||
try:
|
||||
content = from_json(content)
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPBadRequest(title='Failed to read body as json',
|
||||
description=ex.message)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
def as_json(data):
|
||||
"""Writes data as json.
|
||||
|
||||
:param data(dict): data to convert to json
|
||||
:return (str): json string
|
||||
"""
|
||||
return json.dumps(data,
|
||||
encoding=ENCODING,
|
||||
sort_keys=False,
|
||||
ensure_ascii=False)
|
||||
|
||||
|
||||
def from_json(data):
|
||||
"""Reads data from json str.
|
||||
|
||||
:param data(str): data to read
|
||||
:return (dict): read data
|
||||
"""
|
||||
return json.loads(data, encoding=ENCODING)
|
|
@ -19,7 +19,7 @@ import unittest
|
|||
|
||||
from falcon import testing
|
||||
import mock
|
||||
import simplejson
|
||||
import ujson
|
||||
|
||||
from monasca_log_api.tests import base
|
||||
from monasca_log_api.v2.common import log_publisher
|
||||
|
@ -174,8 +174,9 @@ class TestSendMessage(testing.TestBase):
|
|||
instance = log_publisher.LogPublisher()
|
||||
instance._kafka_publisher = mock.Mock()
|
||||
instance.send_message({})
|
||||
expected_key = 'some_key'
|
||||
instance._build_key = mock.Mock(name='_build_key',
|
||||
return_value='some_key')
|
||||
return_value=expected_key)
|
||||
|
||||
creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
|
||||
.total_seconds())
|
||||
|
@ -203,8 +204,8 @@ class TestSendMessage(testing.TestBase):
|
|||
|
||||
instance._kafka_publisher.publish.assert_called_once_with(
|
||||
self.conf.conf.log_publisher.topics[0],
|
||||
# 'some_key', # TODO(feature) next version of monasca-common
|
||||
simplejson.dumps(msg))
|
||||
ujson.dumps(msg),
|
||||
expected_key)
|
||||
|
||||
@mock.patch('monasca_log_api.v2.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
|
@ -215,8 +216,9 @@ class TestSendMessage(testing.TestBase):
|
|||
instance = log_publisher.LogPublisher()
|
||||
instance._kafka_publisher = mock.Mock()
|
||||
instance.send_message({})
|
||||
expected_key = 'some_key'
|
||||
instance._build_key = mock.Mock(name='_build_key',
|
||||
return_value='some_key')
|
||||
return_value=expected_key)
|
||||
|
||||
creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
|
||||
.total_seconds())
|
||||
|
@ -239,7 +241,7 @@ class TestSendMessage(testing.TestBase):
|
|||
'tenantId': 1
|
||||
}
|
||||
}
|
||||
json_msg = simplejson.dumps(msg)
|
||||
json_msg = ujson.dumps(msg)
|
||||
|
||||
instance.send_message(msg)
|
||||
|
||||
|
@ -248,5 +250,5 @@ class TestSendMessage(testing.TestBase):
|
|||
for topic in topics:
|
||||
instance._kafka_publisher.publish.assert_any_call(
|
||||
topic,
|
||||
# 'some_key', # TODO(feature) next version of monasca-common
|
||||
json_msg)
|
||||
json_msg,
|
||||
expected_key)
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
import falcon
|
||||
import mock
|
||||
import simplejson
|
||||
|
||||
from monasca_log_api.api import rest_utils as ru
|
||||
|
||||
|
||||
class RestUtilsTest(unittest.TestCase):
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_read_text_for_plain_text(self, payload):
|
||||
raw_msg = 'Hello World'
|
||||
msg = u''.join(raw_msg)
|
||||
|
||||
payload.read.return_value = raw_msg
|
||||
|
||||
self.assertEqual(msg, ru.read_body(payload, 'text/plain'))
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_read_json_for_application_json(self, payload):
|
||||
raw_msg = u'{"path":"/var/log/messages","message":"This is message"}'
|
||||
json_msg = simplejson.loads(raw_msg, encoding='utf-8')
|
||||
|
||||
payload.read.return_value = raw_msg
|
||||
|
||||
self.assertEqual(json_msg, ru.read_body(payload, 'application/json'))
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_fail_read_text_for_application_json(self, payload):
|
||||
with self.assertRaises(falcon.HTTPBadRequest) as context:
|
||||
raw_msg = 'Hello World'
|
||||
payload.read.return_value = raw_msg
|
||||
ru.read_body(payload, 'application/json')
|
||||
|
||||
self.assertEqual(context.exception.title,
|
||||
'Failed to read body as json')
|
|
@ -14,9 +14,9 @@
|
|||
# under the License.
|
||||
|
||||
from monasca_common.kafka import producer
|
||||
from monasca_common.rest import utils as rest_utils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import simplejson as json
|
||||
|
||||
from monasca_log_api.v2.common import service
|
||||
|
||||
|
@ -62,6 +62,7 @@ class LogPublisher(object):
|
|||
.. _monasca_common: https://github.com/openstack/monasca-common
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._topics = CONF.log_publisher.topics
|
||||
self._kafka_publisher = None
|
||||
|
@ -157,20 +158,17 @@ class LogPublisher(object):
|
|||
raise InvalidMessageException()
|
||||
|
||||
key = self._build_key(message['meta']['tenantId'], message['log'])
|
||||
msg = json.dumps(message,
|
||||
sort_keys=False,
|
||||
ensure_ascii=False).encode('utf8')
|
||||
msg = rest_utils.as_json(message).encode('utf8')
|
||||
|
||||
service.Validations.validate_envelope_size(msg)
|
||||
|
||||
# TODO(feature) next version of monasca-common
|
||||
LOG.debug('Build key [%s] for message' % key)
|
||||
LOG.debug('Sending message {topics=%s,key=%s,message=%s}' %
|
||||
(self._topics, key, msg))
|
||||
LOG.debug('Build key [%s] for message', key)
|
||||
LOG.debug('Sending message {topics=%s,key=%s,message=%s}',
|
||||
self._topics, key, msg)
|
||||
|
||||
try:
|
||||
for topic in self._topics:
|
||||
self._publisher().publish(topic, msg)
|
||||
self._publisher().publish(topic, msg, key)
|
||||
except Exception as ex:
|
||||
LOG.error(ex.message)
|
||||
raise ex
|
||||
|
|
|
@ -18,12 +18,12 @@ import re
|
|||
import sys
|
||||
|
||||
from falcon import errors as falcon_errors
|
||||
from monasca_common.rest import utils as rest_utils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_log_api.api import exceptions
|
||||
from monasca_log_api.api import logs_api
|
||||
from monasca_log_api.api import rest_utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
# under the License.
|
||||
|
||||
import falcon
|
||||
from monasca_common.rest import utils as rest_utils
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_log_api.api import rest_utils
|
||||
from monasca_log_api.api import versions_api
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
|
|
@ -12,5 +12,5 @@ pbr>=1.6.0
|
|||
six>=1.9.0
|
||||
simplejson>=2.2.0
|
||||
simport
|
||||
monasca-common>=0.0.2
|
||||
eventlet>=0.17.4,!=0.18.0
|
||||
monasca-common>=0.0.6
|
||||
eventlet>=0.17.4,!=0.18.0
|
||||
|
|
Loading…
Reference in New Issue