Fixes the kafka publisher

The kafka publusher is not concurrency safe at all.
And the sample/event payload cannot be serialized correctly

To fix that:
* the code now is shared with the messaging one.
* the connection to kafka is done before sending messaging to not touch
  the queue
* use jsonutils to serialize samples

Change-Id: I3fb731d2eb33cbfba38c5165ce9874af89072e34
Closes-bug: #1479976
This commit is contained in:
Mehdi Abaakouk 2015-07-31 09:00:20 +02:00
parent 49f53f35a5
commit fb0601a90d
4 changed files with 100 additions and 178 deletions

View File

@ -13,24 +13,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import kafka
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import netutils
from six.moves.urllib import parse as urlparse
from ceilometer.i18n import _LE
from ceilometer.i18n import _LI
from ceilometer.i18n import _LW
from ceilometer import publisher
from ceilometer.publisher import utils
from ceilometer.publisher import messaging
LOG = log.getLogger(__name__)
class KafkaBrokerPublisher(publisher.PublisherBase):
class KafkaBrokerPublisher(messaging.MessagingPublisher):
"""Publish metering data to kafka broker.
The ip address and port number of kafka broker should be configured in
@ -68,132 +63,34 @@ class KafkaBrokerPublisher(publisher.PublisherBase):
"""
def __init__(self, parsed_url):
self.kafka_client = None
super(KafkaBrokerPublisher, self).__init__(parsed_url)
options = urlparse.parse_qs(parsed_url.query)
self.host, self.port = netutils.parse_host_port(
self._producer = None
self._host, self._port = netutils.parse_host_port(
parsed_url.netloc, default_port=9092)
self._topic = options.get('topic', ['ceilometer'])[-1]
self.max_retry = int(options.get('max_retry', [100])[-1])
self.local_queue = []
params = urlparse.parse_qs(parsed_url.query)
self.topic = params.get('topic', ['ceilometer'])[-1]
self.policy = params.get('policy', ['default'])[-1]
self.max_queue_length = int(params.get(
'max_queue_length', [1024])[-1])
self.max_retry = int(params.get('max_retry', [100])[-1])
if self.policy in ['default', 'drop', 'queue']:
LOG.info(_LI('Publishing policy set to %s') % self.policy)
else:
LOG.warn(_LW('Publishing policy is unknown (%s) force to default')
% self.policy)
self.policy = 'default'
def _ensure_connection(self):
if self._producer:
return
try:
self._get_client()
client = kafka.KafkaClient("%s:%s" % (self._host, self._port))
self._producer = kafka.SimpleProducer(client)
except Exception as e:
LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
def publish_samples(self, context, samples):
"""Send a metering message for kafka broker.
:param context: Execution context from the service or RPC call
:param samples: Samples from pipeline after transformation
"""
samples_list = [
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
for sample in samples
]
self.local_queue.append(samples_list)
try:
self._check_kafka_connection()
except Exception as e:
raise e
self.flush()
def flush(self):
queue = self.local_queue
self.local_queue = self._process_queue(queue)
if self.policy == 'queue':
self._check_queue_length()
def publish_events(self, context, events):
"""Send an event message for kafka broker.
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
events_list = [utils.message_from_event(
event, cfg.CONF.publisher.telemetry_secret) for event in events]
self.local_queue.append(events_list)
try:
self._check_kafka_connection()
except Exception as e:
raise e
self.flush()
def _process_queue(self, queue):
current_retry = 0
while queue:
data = queue[0]
try:
self._send(data)
except Exception:
LOG.warn(_LW("Failed to publish %d datum"),
sum([len(d) for d in queue]))
if self.policy == 'queue':
return queue
elif self.policy == 'drop':
return []
current_retry += 1
if current_retry >= self.max_retry:
self.local_queue = []
LOG.exception(_LE("Failed to retry to send sample data "
"with max_retry times"))
raise
else:
queue.pop(0)
return []
def _check_queue_length(self):
queue_length = len(self.local_queue)
if queue_length > self.max_queue_length > 0:
diff = queue_length - self.max_queue_length
self.local_queue = self.local_queue[diff:]
LOG.warn(_LW("Kafka Publisher max local queue length is exceeded, "
"dropping %d oldest data") % diff)
def _check_kafka_connection(self):
try:
self._get_client()
except Exception as e:
LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
if self.policy == 'queue':
self._check_queue_length()
else:
self.local_queue = []
raise Exception('Kafka Client is not available, '
raise messaging.DeliveryFailure('Kafka Client is not available, '
'please restart Kafka client')
def _get_client(self):
if not self.kafka_client:
self.kafka_client = kafka.KafkaClient(
"%s:%s" % (self.host, self.port))
self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
def _send(self, data):
for d in data:
def _send(self, context, event_type, data):
self._ensure_connection()
# TODO(sileht): don't split the payload into multiple network
# message ... but how to do that without breaking consuming
# application...
try:
self.kafka_producer.send_messages(
self.topic, json.dumps(d))
for d in data:
self._producer.send_messages(self._topic, jsonutils.dumps(d))
except Exception as e:
LOG.exception(_LE("Failed to send sample data: %s"), e)
raise
messaging.raise_delivery_failure(e)

View File

@ -22,10 +22,12 @@ import operator
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_utils import encodeutils
from oslo_utils import excutils
import six
import six.moves.urllib.parse as urlparse
from ceilometer.i18n import _
from ceilometer.i18n import _, _LE
from ceilometer import messaging
from ceilometer import publisher
from ceilometer.publisher import utils
@ -67,6 +69,18 @@ cfg.CONF.register_opts(NOTIFIER_OPTS,
cfg.CONF.import_opt('host', 'ceilometer.service')
class DeliveryFailure(Exception):
def __init__(self, message=None, cause=None):
super(DeliveryFailure, self).__init__(message)
self.cause = cause
def raise_delivery_failure(exc):
excutils.raise_with_cause(DeliveryFailure,
encodeutils.exception_to_unicode(exc),
cause=exc)
@six.add_metaclass(abc.ABCMeta)
class MessagingPublisher(publisher.PublisherBase):
@ -81,6 +95,7 @@ class MessagingPublisher(publisher.PublisherBase):
self.policy = options.get('policy', ['default'])[-1]
self.max_queue_length = int(options.get(
'max_queue_length', [1024])[-1])
self.max_retry = 0
self.local_queue = []
@ -144,11 +159,12 @@ class MessagingPublisher(publisher.PublisherBase):
"dropping %d oldest samples") % count)
def _process_queue(self, queue, policy):
current_retry = 0
while queue:
context, topic, data = queue[0]
try:
self._send(context, topic, data)
except oslo_messaging.MessageDeliveryFailure:
except DeliveryFailure:
data = sum([len(m) for __, __, m in queue])
if policy == 'queue':
LOG.warn(_("Failed to publish %d datapoints, queue them"),
@ -158,7 +174,10 @@ class MessagingPublisher(publisher.PublisherBase):
LOG.warn(_("Failed to publish %d datapoints, "
"dropping them"), data)
return []
# default, occur only if rabbit_max_retries > 0
current_retry += 1
if current_retry >= self.max_retry:
LOG.exception(_LE("Failed to retry to send sample data "
"with max_retry times"))
raise
else:
queue.pop(0)
@ -195,8 +214,11 @@ class RPCPublisher(MessagingPublisher):
)
def _send(self, context, topic, meters):
try:
self.rpc_client.prepare(topic=topic).cast(context, self.target,
data=meters)
except oslo_messaging.MessageDeliveryFailure as e:
raise_delivery_failure(e)
class NotifierPublisher(MessagingPublisher):
@ -213,8 +235,11 @@ class NotifierPublisher(MessagingPublisher):
)
def _send(self, context, event_type, data):
try:
self.notifier.sample(context.to_dict(), event_type=event_type,
payload=data)
except oslo_messaging.MessageDeliveryFailure as e:
raise_delivery_failure(e)
class SampleNotifierPublisher(NotifierPublisher):

View File

@ -22,12 +22,14 @@ from oslo_utils import netutils
from ceilometer.event.storage import models as event
from ceilometer.publisher import kafka_broker as kafka
from ceilometer.publisher import messaging as msg_publisher
from ceilometer import sample
from ceilometer.tests import base as tests_base
@mock.patch('ceilometer.publisher.kafka_broker.LOG', mock.Mock())
@mock.patch.object(kafka.KafkaBrokerPublisher, '_get_client', mock.Mock())
@mock.patch('ceilometer.publisher.kafka_broker.kafka.KafkaClient',
mock.Mock())
class TestKafkaPublisher(tests_base.BaseTestCase):
test_event_data = [
event.Event(message_id=uuid.uuid4(),
@ -95,25 +97,22 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
),
]
def setUp(self):
super(TestKafkaPublisher, self).setUp()
def test_publish(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer'))
with mock.patch.object(publisher, '_send') as fake_send:
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(mock.MagicMock(), self.test_data)
self.assertEqual(1, len(fake_send.mock_calls))
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_without_options(self):
publisher = kafka.KafkaBrokerPublisher(
netutils.urlsplit('kafka://127.0.0.1:9092'))
with mock.patch.object(publisher, '_send') as fake_send:
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(mock.MagicMock(), self.test_data)
self.assertEqual(1, len(fake_send.mock_calls))
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_without_policy(self):
@ -129,39 +128,40 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=default'))
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = TypeError
self.assertRaises(TypeError, publisher.publish_samples,
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = TypeError
self.assertRaises(msg_publisher.DeliveryFailure,
publisher.publish_samples,
mock.MagicMock(), self.test_data)
self.assertEqual(100, len(fake_send.mock_calls))
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_drop_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop'))
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = Exception("test")
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
publisher.publish_samples(mock.MagicMock(), self.test_data)
self.assertEqual(1, len(fake_send.mock_calls))
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_queue_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = Exception("test")
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
publisher.publish_samples(mock.MagicMock(), self.test_data)
self.assertEqual(1, len(fake_send.mock_calls))
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(1, len(publisher.local_queue))
def test_publish_to_down_host_with_default_queue_size(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = Exception('No Connection')
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
for i in range(0, 2000):
for s in self.test_data:
@ -170,16 +170,16 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(1024, len(publisher.local_queue))
self.assertEqual('test-976',
publisher.local_queue[0][0]['counter_name'])
publisher.local_queue[0][2][0]['counter_name'])
self.assertEqual('test-1999',
publisher.local_queue[1023][0]['counter_name'])
publisher.local_queue[1023][2][0]['counter_name'])
def test_publish_to_host_from_down_to_up_with_queue(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = Exception('No Connection')
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
for i in range(0, 16):
for s in self.test_data:
s.name = 'test-%d' % i
@ -187,7 +187,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(16, len(publisher.local_queue))
fake_send.side_effect = None
fake_producer.send_messages.side_effect = None
for s in self.test_data:
s.name = 'test-%d' % 16
publisher.publish_samples(mock.MagicMock(), self.test_data)
@ -197,13 +197,14 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
publisher = kafka.KafkaBrokerPublisher(
netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer'))
with mock.patch.object(publisher, '_send') as fake_send:
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_events(mock.MagicMock(), self.test_event_data)
self.assertEqual(1, len(fake_send.mock_calls))
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = TypeError
self.assertRaises(TypeError, publisher.publish_events,
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
self.assertRaises(msg_publisher.DeliveryFailure,
publisher.publish_events,
mock.MagicMock(), self.test_event_data)
self.assertEqual(100, len(fake_send.mock_calls))
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))

View File

@ -21,7 +21,6 @@ import eventlet
import mock
from oslo_config import fixture as fixture_config
from oslo_context import context
import oslo_messaging
from oslo_utils import netutils
import testscenarios.testcase
@ -250,11 +249,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_no_policy(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
oslo_messaging.MessageDeliveryFailure,
msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called)
@ -267,11 +266,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_policy_block(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=default' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
oslo_messaging.MessageDeliveryFailure,
msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called)
@ -283,11 +282,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_policy_incorrect(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=notexist' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
oslo_messaging.MessageDeliveryFailure,
msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.warn.called)
@ -303,7 +302,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_drop_and_rpc_down(self):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=drop' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
@ -315,7 +314,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_queue_and_rpc_down(self):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
@ -330,7 +329,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
@ -354,7 +353,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(netutils.urlsplit(
'%s://?policy=queue&max_queue_length=3' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
for i in range(0, 5):
@ -381,7 +380,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = oslo_messaging.MessageDeliveryFailure()
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
for i in range(0, 2000):