Merge "Moving driver to new kafka-python version"
This commit is contained in:
commit
74f5dd5c1c
@ -11,64 +11,77 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
# Following code fixes 2 issues with kafka-python and
|
||||||
|
# The current release of eventlet (0.19.0) does not actually remove
|
||||||
|
# select.poll [1]. Because of kafka-python.selectors34 selects
|
||||||
|
# PollSelector instead of SelectSelector [2]. PollSelector relies on
|
||||||
|
# select.poll, which does not work when eventlet/greenlet is used. This
|
||||||
|
# bug in evenlet is fixed in the master branch [3], but there's no
|
||||||
|
# release of eventlet that includes this fix at this point.
|
||||||
|
|
||||||
|
import json
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
import kafka
|
||||||
|
from kafka.client_async import selectors
|
||||||
|
import kafka.errors
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
from oslo_utils import eventletutils
|
||||||
|
import tenacity
|
||||||
|
|
||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
from oslo_messaging._drivers import common as driver_common
|
from oslo_messaging._drivers import common as driver_common
|
||||||
|
from oslo_messaging._drivers import kafka_options
|
||||||
from oslo_messaging._drivers import pool as driver_pool
|
from oslo_messaging._drivers import pool as driver_pool
|
||||||
from oslo_messaging._i18n import _LE
|
from oslo_messaging._i18n import _LE
|
||||||
from oslo_messaging._i18n import _LW
|
from oslo_messaging._i18n import _LW
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
|
|
||||||
import kafka
|
if eventletutils.is_monkey_patched('select'):
|
||||||
from kafka.common import KafkaError
|
# monkeypatch the vendored SelectSelector._select like eventlet does
|
||||||
from oslo_config import cfg
|
# https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
|
||||||
from oslo_log import log as logging
|
from eventlet.green import select
|
||||||
|
selectors.SelectSelector._select = staticmethod(select.select)
|
||||||
|
|
||||||
|
# Force to use the select selectors
|
||||||
|
KAFKA_SELECTOR = selectors.SelectSelector
|
||||||
|
else:
|
||||||
|
KAFKA_SELECTOR = selectors.DefaultSelector
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
PURPOSE_SEND = 'send'
|
|
||||||
PURPOSE_LISTEN = 'listen'
|
|
||||||
|
|
||||||
kafka_opts = [
|
def unpack_message(msg):
|
||||||
cfg.StrOpt('kafka_default_host', default='localhost',
|
context = {}
|
||||||
deprecated_for_removal=True,
|
message = None
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
try:
|
||||||
help='Default Kafka broker Host'),
|
if msg:
|
||||||
|
msg = json.loads(msg)
|
||||||
cfg.PortOpt('kafka_default_port', default=9092,
|
message = driver_common.deserialize_msg(msg)
|
||||||
deprecated_for_removal=True,
|
if 'context' in message:
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
context = message['context']
|
||||||
help='Default Kafka broker Port'),
|
del message['context']
|
||||||
|
except ValueError as e:
|
||||||
cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024,
|
LOG.info("Invalid format of consumed message: %s" % e)
|
||||||
help='Max fetch bytes of Kafka consumer'),
|
except Exception:
|
||||||
|
LOG.warning(_LW("Exception during message unpacking"))
|
||||||
cfg.IntOpt('kafka_consumer_timeout', default=1.0,
|
return message, context
|
||||||
help='Default timeout(s) for Kafka consumers'),
|
|
||||||
|
|
||||||
cfg.IntOpt('pool_size', default=10,
|
|
||||||
help='Pool Size for Kafka Consumers'),
|
|
||||||
|
|
||||||
cfg.IntOpt('conn_pool_min_size', default=2,
|
|
||||||
help='The pool size limit for connections expiration policy'),
|
|
||||||
|
|
||||||
cfg.IntOpt('conn_pool_ttl', default=1200,
|
|
||||||
help='The time-to-live in sec of idle connections in the pool')
|
|
||||||
]
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
|
|
||||||
|
|
||||||
def pack_context_with_message(ctxt, msg):
|
def pack_message(ctxt, msg):
|
||||||
"""Pack context into msg."""
|
"""Pack context into msg."""
|
||||||
|
|
||||||
if isinstance(ctxt, dict):
|
if isinstance(ctxt, dict):
|
||||||
context_d = ctxt
|
context_d = ctxt
|
||||||
else:
|
else:
|
||||||
context_d = ctxt.to_dict()
|
context_d = ctxt.to_dict()
|
||||||
|
msg['context'] = context_d
|
||||||
|
|
||||||
return {'message': msg, 'context': context_d}
|
msg = driver_common.serialize_msg(msg)
|
||||||
|
|
||||||
|
return msg
|
||||||
|
|
||||||
|
|
||||||
def target_to_topic(target, priority=None):
|
def target_to_topic(target, priority=None):
|
||||||
@ -84,18 +97,67 @@ def target_to_topic(target, priority=None):
|
|||||||
return target.topic + '.' + priority
|
return target.topic + '.' + priority
|
||||||
|
|
||||||
|
|
||||||
|
def retry_on_retriable_kafka_error(exc):
|
||||||
|
return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable)
|
||||||
|
|
||||||
|
|
||||||
|
def with_reconnect(retries=None):
|
||||||
|
def decorator(func):
|
||||||
|
@tenacity.retry(
|
||||||
|
retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error),
|
||||||
|
wait=tenacity.wait_fixed(1),
|
||||||
|
stop=tenacity.stop_after_attempt(retries),
|
||||||
|
reraise=True
|
||||||
|
)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
return wrapper
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
class Producer(object):
|
||||||
|
_producer = None
|
||||||
|
_servers = None
|
||||||
|
_lock = threading.Lock()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@with_reconnect()
|
||||||
|
def connect(servers, **kwargs):
|
||||||
|
return kafka.KafkaProducer(
|
||||||
|
bootstrap_servers=servers,
|
||||||
|
selector=KAFKA_SELECTOR,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def producer(cls, servers, **kwargs):
|
||||||
|
with cls._lock:
|
||||||
|
if not cls._producer or cls._servers != servers:
|
||||||
|
cls._servers = servers
|
||||||
|
cls._producer = cls.connect(servers, **kwargs)
|
||||||
|
return cls._producer
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def cleanup(cls):
|
||||||
|
with cls._lock:
|
||||||
|
if cls._producer:
|
||||||
|
cls._producer.close()
|
||||||
|
cls._producer = None
|
||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
|
|
||||||
def __init__(self, conf, url, purpose):
|
def __init__(self, conf, url, purpose):
|
||||||
|
|
||||||
|
self.client = None
|
||||||
driver_conf = conf.oslo_messaging_kafka
|
driver_conf = conf.oslo_messaging_kafka
|
||||||
|
self.batch_size = driver_conf.producer_batch_size
|
||||||
|
self.linger_ms = driver_conf.producer_batch_timeout * 1000
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.kafka_client = None
|
|
||||||
self.producer = None
|
self.producer = None
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
self.fetch_messages_max_bytes = driver_conf.kafka_max_fetch_bytes
|
|
||||||
self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
|
self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
|
||||||
|
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
|
||||||
|
self.group_id = driver_conf.consumer_group
|
||||||
self.url = url
|
self.url = url
|
||||||
self._parse_url()
|
self._parse_url()
|
||||||
# TODO(Support for manual/auto_commit functionality)
|
# TODO(Support for manual/auto_commit functionality)
|
||||||
@ -107,7 +169,6 @@ class Connection(object):
|
|||||||
|
|
||||||
def _parse_url(self):
|
def _parse_url(self):
|
||||||
driver_conf = self.conf.oslo_messaging_kafka
|
driver_conf = self.conf.oslo_messaging_kafka
|
||||||
|
|
||||||
self.hostaddrs = []
|
self.hostaddrs = []
|
||||||
|
|
||||||
for host in self.url.hosts:
|
for host in self.url.hosts:
|
||||||
@ -128,65 +189,46 @@ class Connection(object):
|
|||||||
:param msg: messages for publishing
|
:param msg: messages for publishing
|
||||||
:param retry: the number of retry
|
:param retry: the number of retry
|
||||||
"""
|
"""
|
||||||
message = pack_context_with_message(ctxt, msg)
|
|
||||||
|
message = pack_message(ctxt, msg)
|
||||||
self._ensure_connection()
|
self._ensure_connection()
|
||||||
self._send_and_retry(message, topic, retry)
|
self._send_and_retry(message, topic, retry)
|
||||||
|
|
||||||
def _send_and_retry(self, message, topic, retry):
|
def _send_and_retry(self, message, topic, retry):
|
||||||
current_retry = 0
|
|
||||||
if not isinstance(message, str):
|
if not isinstance(message, str):
|
||||||
message = jsonutils.dumps(message)
|
message = jsonutils.dumps(message)
|
||||||
while message is not None:
|
retry = retry if retry >= 0 else None
|
||||||
try:
|
|
||||||
self._send(message, topic)
|
|
||||||
message = None
|
|
||||||
except Exception:
|
|
||||||
LOG.warning(_LW("Failed to publish a message of topic %s"),
|
|
||||||
topic)
|
|
||||||
current_retry += 1
|
|
||||||
if retry is not None and current_retry >= retry:
|
|
||||||
LOG.exception(_LE("Failed to retry to send data "
|
|
||||||
"with max retry times"))
|
|
||||||
message = None
|
|
||||||
|
|
||||||
def _send(self, message, topic):
|
@with_reconnect(retries=retry)
|
||||||
self.producer.send_messages(topic, message)
|
def _send(topic, message):
|
||||||
|
self.producer.send(topic, message)
|
||||||
|
|
||||||
|
try:
|
||||||
|
_send(topic, message)
|
||||||
|
except Exception:
|
||||||
|
Producer.cleanup()
|
||||||
|
LOG.exception(_LE("Failed to send message"))
|
||||||
|
|
||||||
|
@with_reconnect()
|
||||||
|
def _poll_messages(self, timeout):
|
||||||
|
return self.consumer.poll(timeout)
|
||||||
|
|
||||||
def consume(self, timeout=None):
|
def consume(self, timeout=None):
|
||||||
"""Receive up to 'max_fetch_messages' messages.
|
"""Receive up to 'max_fetch_messages' messages.
|
||||||
|
|
||||||
:param timeout: poll timeout in seconds
|
:param timeout: poll timeout in seconds
|
||||||
"""
|
"""
|
||||||
duration = (self.consumer_timeout if timeout is None else timeout)
|
|
||||||
timer = driver_common.DecayingTimer(duration=duration)
|
|
||||||
timer.start()
|
|
||||||
|
|
||||||
def _raise_timeout():
|
|
||||||
LOG.debug('Timed out waiting for Kafka response')
|
|
||||||
raise driver_common.Timeout()
|
|
||||||
|
|
||||||
poll_timeout = (self.consumer_timeout if timeout is None
|
|
||||||
else min(timeout, self.consumer_timeout))
|
|
||||||
|
|
||||||
while True:
|
|
||||||
if self._consume_loop_stopped:
|
if self._consume_loop_stopped:
|
||||||
return
|
return None
|
||||||
|
|
||||||
|
timeout = timeout if timeout >= 0 else self.consumer_timeout
|
||||||
try:
|
try:
|
||||||
next_timeout = poll_timeout * 1000.0
|
messages = self._poll_messages(timeout)
|
||||||
# TODO(use configure() method instead)
|
except kafka.errors.ConsumerTimeout as e:
|
||||||
# Currently KafkaConsumer does not support for
|
raise driver_common.Timeout(e.message)
|
||||||
# the case of updating only fetch_max_wait_ms parameter
|
except Exception:
|
||||||
self.consumer._config['fetch_max_wait_ms'] = next_timeout
|
LOG.exception(_LE("Failed to consume messages"))
|
||||||
messages = list(self.consumer.fetch_messages())
|
|
||||||
except Exception as e:
|
|
||||||
LOG.exception(_LE("Failed to consume messages: %s"), e)
|
|
||||||
messages = None
|
messages = None
|
||||||
|
|
||||||
if not messages:
|
|
||||||
poll_timeout = timer.check_return(
|
|
||||||
_raise_timeout, maximum=self.consumer_timeout)
|
|
||||||
continue
|
|
||||||
|
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
def stop_consuming(self):
|
def stop_consuming(self):
|
||||||
@ -194,16 +236,14 @@ class Connection(object):
|
|||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
"""Reset a connection so it can be used again."""
|
"""Reset a connection so it can be used again."""
|
||||||
if self.consumer:
|
pass
|
||||||
self.consumer.close()
|
|
||||||
self.consumer = None
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self.kafka_client:
|
|
||||||
self.kafka_client.close()
|
|
||||||
self.kafka_client = None
|
|
||||||
if self.producer:
|
if self.producer:
|
||||||
self.producer.stop()
|
self.producer.close()
|
||||||
|
self.producer = None
|
||||||
|
if self.consumer:
|
||||||
|
self.consumer.close()
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
|
|
||||||
def commit(self):
|
def commit(self):
|
||||||
@ -218,25 +258,22 @@ class Connection(object):
|
|||||||
self.consumer.commit()
|
self.consumer.commit()
|
||||||
|
|
||||||
def _ensure_connection(self):
|
def _ensure_connection(self):
|
||||||
if self.kafka_client:
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
self.kafka_client = kafka.KafkaClient(
|
self.producer = Producer.producer(self.hostaddrs,
|
||||||
self.hostaddrs)
|
linger_ms=self.linger_ms,
|
||||||
self.producer = kafka.SimpleProducer(self.kafka_client)
|
batch_size=self.batch_size)
|
||||||
except KafkaError as e:
|
except kafka.errors.KafkaError as e:
|
||||||
LOG.exception(_LE("Kafka Connection is not available: %s"), e)
|
LOG.exception(_LE("KafkaProducer could not be initialized: %s"), e)
|
||||||
self.kafka_client = None
|
raise
|
||||||
|
|
||||||
|
@with_reconnect()
|
||||||
def declare_topic_consumer(self, topics, group=None):
|
def declare_topic_consumer(self, topics, group=None):
|
||||||
self._ensure_connection()
|
|
||||||
for topic in topics:
|
|
||||||
self.kafka_client.ensure_topic_exists(topic)
|
|
||||||
self.consumer = kafka.KafkaConsumer(
|
self.consumer = kafka.KafkaConsumer(
|
||||||
*topics, group_id=group,
|
*topics, group_id=(group or self.group_id),
|
||||||
bootstrap_servers=self.hostaddrs,
|
bootstrap_servers=self.hostaddrs,
|
||||||
fetch_message_max_bytes=self.fetch_messages_max_bytes)
|
max_partition_fetch_bytes=self.max_fetch_bytes,
|
||||||
self._consume_loop_stopped = False
|
selector=KAFKA_SELECTOR
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class OsloKafkaMessage(base.RpcIncomingMessage):
|
class OsloKafkaMessage(base.RpcIncomingMessage):
|
||||||
@ -261,20 +298,26 @@ class KafkaListener(base.PollStyleListener):
|
|||||||
|
|
||||||
@base.batch_poll_helper
|
@base.batch_poll_helper
|
||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
|
# TODO(sileht): use batch capability of kafka
|
||||||
while not self._stopped.is_set():
|
while not self._stopped.is_set():
|
||||||
if self.incoming_queue:
|
if self.incoming_queue:
|
||||||
return self.incoming_queue.pop(0)
|
return self.incoming_queue.pop(0)
|
||||||
try:
|
try:
|
||||||
messages = self.conn.consume(timeout=timeout)
|
messages = self.conn.consume(timeout=timeout)
|
||||||
for msg in messages:
|
if messages:
|
||||||
message = msg.value
|
self._put_messages_to_queue(messages)
|
||||||
LOG.debug('poll got message : %s', message)
|
|
||||||
message = jsonutils.loads(message)
|
|
||||||
self.incoming_queue.append(OsloKafkaMessage(
|
|
||||||
ctxt=message['context'], message=message['message']))
|
|
||||||
except driver_common.Timeout:
|
except driver_common.Timeout:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _put_messages_to_queue(self, messages):
|
||||||
|
for topic, records in messages.items():
|
||||||
|
if records:
|
||||||
|
for record in records:
|
||||||
|
message, context = unpack_message(record.value)
|
||||||
|
if message:
|
||||||
|
self.incoming_queue.append(
|
||||||
|
OsloKafkaMessage(ctxt=context, message=message))
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._stopped.set()
|
self._stopped.set()
|
||||||
self.conn.stop_consuming()
|
self.conn.stop_consuming()
|
||||||
@ -302,7 +345,7 @@ class KafkaDriver(base.BaseDriver):
|
|||||||
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
||||||
title='Kafka driver options')
|
title='Kafka driver options')
|
||||||
conf.register_group(opt_group)
|
conf.register_group(opt_group)
|
||||||
conf.register_opts(kafka_opts, group=opt_group)
|
conf.register_opts(kafka_options.KAFKA_OPTS, group=opt_group)
|
||||||
|
|
||||||
super(KafkaDriver, self).__init__(
|
super(KafkaDriver, self).__init__(
|
||||||
conf, url, default_exchange, allowed_remote_exmods)
|
conf, url, default_exchange, allowed_remote_exmods)
|
||||||
@ -344,7 +387,7 @@ class KafkaDriver(base.BaseDriver):
|
|||||||
N means N retries
|
N means N retries
|
||||||
:type retry: int
|
:type retry: int
|
||||||
"""
|
"""
|
||||||
with self._get_connection(purpose=PURPOSE_SEND) as conn:
|
with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn:
|
||||||
conn.notify_send(target_to_topic(target), ctxt, message, retry)
|
conn.notify_send(target_to_topic(target), ctxt, message, retry)
|
||||||
|
|
||||||
def listen(self, target, batch_size, batch_timeout):
|
def listen(self, target, batch_size, batch_timeout):
|
||||||
@ -363,7 +406,7 @@ class KafkaDriver(base.BaseDriver):
|
|||||||
:param pool: consumer group of Kafka consumers
|
:param pool: consumer group of Kafka consumers
|
||||||
:type pool: string
|
:type pool: string
|
||||||
"""
|
"""
|
||||||
conn = self._get_connection(purpose=PURPOSE_LISTEN)
|
conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN)
|
||||||
topics = set()
|
topics = set()
|
||||||
for target, priority in targets_and_priorities:
|
for target, priority in targets_and_priorities:
|
||||||
topics.add(target_to_topic(target, priority))
|
topics.add(target_to_topic(target, priority))
|
||||||
|
52
oslo_messaging/_drivers/kafka_options.py
Normal file
52
oslo_messaging/_drivers/kafka_options.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
KAFKA_OPTS = [
|
||||||
|
cfg.StrOpt('kafka_default_host', default='localhost',
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
||||||
|
help='Default Kafka broker Host'),
|
||||||
|
|
||||||
|
cfg.PortOpt('kafka_default_port', default=9092,
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
||||||
|
help='Default Kafka broker Port'),
|
||||||
|
|
||||||
|
cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024,
|
||||||
|
help='Max fetch bytes of Kafka consumer'),
|
||||||
|
|
||||||
|
cfg.IntOpt('kafka_consumer_timeout', default=1.0,
|
||||||
|
help='Default timeout(s) for Kafka consumers'),
|
||||||
|
|
||||||
|
cfg.IntOpt('pool_size', default=10,
|
||||||
|
help='Pool Size for Kafka Consumers'),
|
||||||
|
|
||||||
|
cfg.IntOpt('conn_pool_min_size', default=2,
|
||||||
|
help='The pool size limit for connections expiration policy'),
|
||||||
|
|
||||||
|
cfg.IntOpt('conn_pool_ttl', default=1200,
|
||||||
|
help='The time-to-live in sec of idle connections in the pool'),
|
||||||
|
|
||||||
|
cfg.StrOpt('consumer_group', default="oslo_messaging_consumer",
|
||||||
|
help='Group id for Kafka consumer. Consumers in one group '
|
||||||
|
'will coordinate message consumption'),
|
||||||
|
|
||||||
|
cfg.FloatOpt('producer_batch_timeout', default=0.,
|
||||||
|
help="Upper bound on the delay for KafkaProducer batching "
|
||||||
|
"in seconds"),
|
||||||
|
|
||||||
|
cfg.IntOpt('producer_batch_size', default=16384,
|
||||||
|
help='Size of batch for the producer async send')
|
||||||
|
]
|
@ -26,6 +26,7 @@ from oslo_messaging._drivers import base as drivers_base
|
|||||||
from oslo_messaging._drivers import impl_pika
|
from oslo_messaging._drivers import impl_pika
|
||||||
from oslo_messaging._drivers import impl_rabbit
|
from oslo_messaging._drivers import impl_rabbit
|
||||||
from oslo_messaging._drivers.impl_zmq import zmq_options
|
from oslo_messaging._drivers.impl_zmq import zmq_options
|
||||||
|
from oslo_messaging._drivers import kafka_options
|
||||||
from oslo_messaging._drivers.pika_driver import pika_connection_factory
|
from oslo_messaging._drivers.pika_driver import pika_connection_factory
|
||||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
|
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
|
||||||
from oslo_messaging.notify import notifier
|
from oslo_messaging.notify import notifier
|
||||||
@ -53,6 +54,7 @@ _opts = [
|
|||||||
pika_connection_factory.pika_opts,
|
pika_connection_factory.pika_opts,
|
||||||
impl_pika.pika_pool_opts, impl_pika.message_opts,
|
impl_pika.pika_pool_opts, impl_pika.message_opts,
|
||||||
impl_pika.notification_opts, impl_pika.rpc_opts))),
|
impl_pika.notification_opts, impl_pika.rpc_opts))),
|
||||||
|
('oslo_messaging_kafka', kafka_options.KAFKA_OPTS),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -12,14 +12,12 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import kafka
|
import kafka
|
||||||
from kafka.common import KafkaError
|
import kafka.errors
|
||||||
import mock
|
import mock
|
||||||
from oslo_serialization import jsonutils
|
|
||||||
import testscenarios
|
import testscenarios
|
||||||
import time
|
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging._drivers import common as driver_common
|
from oslo_messaging._drivers import common as common_driver
|
||||||
from oslo_messaging._drivers import impl_kafka as kafka_driver
|
from oslo_messaging._drivers import impl_kafka as kafka_driver
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
|
|
||||||
@ -63,7 +61,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
|
|||||||
self.addCleanup(transport.cleanup)
|
self.addCleanup(transport.cleanup)
|
||||||
driver = transport._driver
|
driver = transport._driver
|
||||||
|
|
||||||
conn = driver._get_connection(kafka_driver.PURPOSE_SEND)
|
conn = driver._get_connection(common_driver.PURPOSE_SEND)
|
||||||
self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs)
|
self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs)
|
||||||
|
|
||||||
|
|
||||||
@ -76,6 +74,7 @@ class TestKafkaDriver(test_utils.BaseTestCase):
|
|||||||
self.messaging_conf.transport_driver = 'kafka'
|
self.messaging_conf.transport_driver = 'kafka'
|
||||||
transport = oslo_messaging.get_transport(self.conf)
|
transport = oslo_messaging.get_transport(self.conf)
|
||||||
self.driver = transport._driver
|
self.driver = transport._driver
|
||||||
|
self.addCleanup(kafka_driver.Producer.cleanup)
|
||||||
|
|
||||||
def test_send(self):
|
def test_send(self):
|
||||||
target = oslo_messaging.Target(topic="topic_test")
|
target = oslo_messaging.Target(topic="topic_test")
|
||||||
@ -85,16 +84,40 @@ class TestKafkaDriver(test_utils.BaseTestCase):
|
|||||||
def test_send_notification(self):
|
def test_send_notification(self):
|
||||||
target = oslo_messaging.Target(topic="topic_test")
|
target = oslo_messaging.Target(topic="topic_test")
|
||||||
|
|
||||||
with mock.patch.object(
|
with mock.patch("kafka.KafkaProducer") as fake_producer_class:
|
||||||
kafka_driver.Connection, 'notify_send') as fake_send:
|
fake_producer = fake_producer_class.return_value
|
||||||
self.driver.send_notification(target, {}, {}, None)
|
fake_producer.send.side_effect = kafka.errors.NoBrokersAvailable
|
||||||
self.assertEqual(1, len(fake_send.mock_calls))
|
self.driver.send_notification(target, {}, {"payload": ["test_1"]},
|
||||||
|
None, retry=3)
|
||||||
|
self.assertEqual(3, fake_producer.send.call_count)
|
||||||
|
|
||||||
def test_listen(self):
|
def test_listen(self):
|
||||||
target = oslo_messaging.Target(topic="topic_test")
|
target = oslo_messaging.Target(topic="topic_test")
|
||||||
self.assertRaises(NotImplementedError, self.driver.listen, target,
|
self.assertRaises(NotImplementedError, self.driver.listen, target,
|
||||||
None, None)
|
None, None)
|
||||||
|
|
||||||
|
def test_listen_for_notifications(self):
|
||||||
|
targets_and_priorities = [
|
||||||
|
(oslo_messaging.Target(topic="topic_test_1"), "sample"),
|
||||||
|
]
|
||||||
|
expected_topics = ["topic_test_1.sample"]
|
||||||
|
with mock.patch("kafka.KafkaConsumer") as consumer:
|
||||||
|
self.driver.listen_for_notifications(
|
||||||
|
targets_and_priorities, "kafka_test", 1000, 10)
|
||||||
|
consumer.assert_called_once_with(
|
||||||
|
*expected_topics, group_id="kafka_test",
|
||||||
|
bootstrap_servers=['localhost:9092'],
|
||||||
|
max_partition_fetch_bytes=mock.ANY,
|
||||||
|
selector=mock.ANY
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_cleanup(self):
|
||||||
|
listeners = [mock.MagicMock(), mock.MagicMock()]
|
||||||
|
self.driver.listeners.extend(listeners)
|
||||||
|
self.driver.cleanup()
|
||||||
|
for listener in listeners:
|
||||||
|
listener.close.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
class TestKafkaConnection(test_utils.BaseTestCase):
|
class TestKafkaConnection(test_utils.BaseTestCase):
|
||||||
|
|
||||||
@ -105,134 +128,9 @@ class TestKafkaConnection(test_utils.BaseTestCase):
|
|||||||
self.driver = transport._driver
|
self.driver = transport._driver
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||||
@mock.patch.object(kafka_driver.Connection, '_send')
|
@mock.patch.object(kafka_driver.Connection, '_send_and_retry')
|
||||||
def test_notify(self, fake_send, fake_ensure_connection):
|
def test_notify(self, fake_send, fake_ensure_connection):
|
||||||
conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
|
conn = self.driver._get_connection(common_driver.PURPOSE_SEND)
|
||||||
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
||||||
{"fake_text": "fake_message_1"}, 10)
|
{"fake_text": "fake_message_1"}, 10)
|
||||||
self.assertEqual(1, len(fake_send.mock_calls))
|
self.assertEqual(1, len(fake_send.mock_calls))
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_send')
|
|
||||||
def test_notify_with_retry(self, fake_send, fake_ensure_connection):
|
|
||||||
conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
|
|
||||||
fake_send.side_effect = KafkaError("fake_exception")
|
|
||||||
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
|
||||||
{"fake_text": "fake_message_2"}, 10)
|
|
||||||
self.assertEqual(10, len(fake_send.mock_calls))
|
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
|
||||||
def test_consume(self, fake_parse_url, fake_ensure_connection):
|
|
||||||
fake_message = {
|
|
||||||
"context": {"fake": "fake_context_1"},
|
|
||||||
"message": {"fake": "fake_message_1"}}
|
|
||||||
|
|
||||||
conn = kafka_driver.Connection(
|
|
||||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
|
||||||
|
|
||||||
conn.consumer = mock.MagicMock()
|
|
||||||
conn.consumer.fetch_messages = mock.MagicMock(
|
|
||||||
return_value=iter([jsonutils.dumps(fake_message)]))
|
|
||||||
|
|
||||||
self.assertEqual(fake_message, jsonutils.loads(conn.consume()[0]))
|
|
||||||
self.assertEqual(1, len(conn.consumer.fetch_messages.mock_calls))
|
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
|
||||||
def test_consume_timeout(self, fake_parse_url, fake_ensure_connection):
|
|
||||||
deadline = time.time() + 3
|
|
||||||
conn = kafka_driver.Connection(
|
|
||||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
|
||||||
|
|
||||||
conn.consumer = mock.MagicMock()
|
|
||||||
conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
|
|
||||||
|
|
||||||
self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
|
|
||||||
self.assertEqual(0, int(deadline - time.time()))
|
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
|
||||||
def test_consume_with_default_timeout(
|
|
||||||
self, fake_parse_url, fake_ensure_connection):
|
|
||||||
deadline = time.time() + 1
|
|
||||||
conn = kafka_driver.Connection(
|
|
||||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
|
||||||
|
|
||||||
conn.consumer = mock.MagicMock()
|
|
||||||
conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
|
|
||||||
|
|
||||||
self.assertRaises(driver_common.Timeout, conn.consume)
|
|
||||||
self.assertEqual(0, int(deadline - time.time()))
|
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
|
||||||
def test_consume_timeout_without_consumers(
|
|
||||||
self, fake_parse_url, fake_ensure_connection):
|
|
||||||
deadline = time.time() + 3
|
|
||||||
conn = kafka_driver.Connection(
|
|
||||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
|
||||||
conn.consumer = mock.MagicMock(return_value=None)
|
|
||||||
|
|
||||||
self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
|
|
||||||
self.assertEqual(0, int(deadline - time.time()))
|
|
||||||
|
|
||||||
|
|
||||||
class TestKafkaListener(test_utils.BaseTestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestKafkaListener, self).setUp()
|
|
||||||
self.messaging_conf.transport_driver = 'kafka'
|
|
||||||
transport = oslo_messaging.get_transport(self.conf)
|
|
||||||
self.driver = transport._driver
|
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
|
||||||
def test_create_listener(self, fake_consumer, fake_ensure_connection):
|
|
||||||
fake_target = oslo_messaging.Target(topic='fake_topic')
|
|
||||||
fake_targets_and_priorities = [(fake_target, 'info')]
|
|
||||||
self.driver.listen_for_notifications(fake_targets_and_priorities, None,
|
|
||||||
None, None)
|
|
||||||
self.assertEqual(1, len(fake_consumer.mock_calls))
|
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
|
||||||
def test_converting_targets_to_topics(self, fake_consumer,
|
|
||||||
fake_ensure_connection):
|
|
||||||
fake_targets_and_priorities = [
|
|
||||||
(oslo_messaging.Target(topic="fake_topic",
|
|
||||||
exchange="test1"), 'info'),
|
|
||||||
(oslo_messaging.Target(topic="fake_topic",
|
|
||||||
exchange="test2"), 'info'),
|
|
||||||
(oslo_messaging.Target(topic="fake_topic",
|
|
||||||
exchange="test1"), 'error'),
|
|
||||||
(oslo_messaging.Target(topic="fake_topic",
|
|
||||||
exchange="test3"), 'error'),
|
|
||||||
]
|
|
||||||
self.driver.listen_for_notifications(fake_targets_and_priorities, None,
|
|
||||||
None, None)
|
|
||||||
self.assertEqual(1, len(fake_consumer.mock_calls))
|
|
||||||
fake_consumer.assert_called_once_with(set(['fake_topic.error',
|
|
||||||
'fake_topic.info']),
|
|
||||||
None)
|
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
|
||||||
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
|
||||||
def test_stop_listener(self, fake_consumer, fake_client):
|
|
||||||
fake_target = oslo_messaging.Target(topic='fake_topic')
|
|
||||||
fake_targets_and_priorities = [(fake_target, 'info')]
|
|
||||||
listener = self.driver.listen_for_notifications(
|
|
||||||
fake_targets_and_priorities, None, None, None)._poll_style_listener
|
|
||||||
listener.conn.consume = mock.MagicMock()
|
|
||||||
listener.conn.consume.return_value = (
|
|
||||||
iter([kafka.common.KafkaMessage(
|
|
||||||
topic='fake_topic', partition=0, offset=0,
|
|
||||||
key=None, value='{"message": {"fake": "fake_message_1"},'
|
|
||||||
'"context": {"fake": "fake_context_1"}}')]))
|
|
||||||
listener.poll()
|
|
||||||
self.assertEqual(1, len(listener.conn.consume.mock_calls))
|
|
||||||
listener.conn.stop_consuming = mock.MagicMock()
|
|
||||||
listener.stop()
|
|
||||||
fake_response = listener.poll()
|
|
||||||
self.assertEqual(1, len(listener.conn.consume.mock_calls))
|
|
||||||
self.assertEqual([], fake_response)
|
|
||||||
|
@ -1,72 +0,0 @@
|
|||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
|
|
||||||
import oslo_messaging
|
|
||||||
from oslo_messaging.tests.functional import utils
|
|
||||||
|
|
||||||
|
|
||||||
class TestWithRealKafkaBroker(utils.SkipIfNoTransportURL):
|
|
||||||
def setUp(self):
|
|
||||||
super(TestWithRealKafkaBroker, self).setUp(conf=cfg.ConfigOpts())
|
|
||||||
if not self.url.startswith('kafka://'):
|
|
||||||
self.skipTest("TRANSPORT_URL is not set to kafka driver")
|
|
||||||
transport = oslo_messaging.get_transport(self.conf, self.url)
|
|
||||||
self.driver = transport._driver
|
|
||||||
|
|
||||||
def test_send_and_receive_message(self):
|
|
||||||
target = oslo_messaging.Target(
|
|
||||||
topic="fake_topic", exchange='fake_exchange')
|
|
||||||
targets_and_priorities = [(target, 'fake_info')]
|
|
||||||
|
|
||||||
listener = self.driver.listen_for_notifications(
|
|
||||||
targets_and_priorities, None, None, None)._poll_style_listener
|
|
||||||
fake_context = {"fake_context_key": "fake_context_value"}
|
|
||||||
fake_message = {"fake_message_key": "fake_message_value"}
|
|
||||||
self.driver.send_notification(
|
|
||||||
target, fake_context, fake_message, None)
|
|
||||||
|
|
||||||
received_message = listener.poll()[0]
|
|
||||||
self.assertEqual(fake_context, received_message.ctxt)
|
|
||||||
self.assertEqual(fake_message, received_message.message)
|
|
||||||
|
|
||||||
def test_send_and_receive_message_without_exchange(self):
|
|
||||||
target = oslo_messaging.Target(topic="fake_no_exchange_topic")
|
|
||||||
targets_and_priorities = [(target, 'fake_info')]
|
|
||||||
|
|
||||||
listener = self.driver.listen_for_notifications(
|
|
||||||
targets_and_priorities, None, None, None)._poll_style_listener
|
|
||||||
fake_context = {"fake_context_key": "fake_context_value"}
|
|
||||||
fake_message = {"fake_message_key": "fake_message_value"}
|
|
||||||
self.driver.send_notification(
|
|
||||||
target, fake_context, fake_message, None)
|
|
||||||
|
|
||||||
received_message = listener.poll()[0]
|
|
||||||
self.assertEqual(fake_context, received_message.ctxt)
|
|
||||||
self.assertEqual(fake_message, received_message.message)
|
|
||||||
|
|
||||||
def test_receive_message_from_empty_topic_with_timeout(self):
|
|
||||||
target = oslo_messaging.Target(
|
|
||||||
topic="fake_empty_topic", exchange='fake_empty_exchange')
|
|
||||||
targets_and_priorities = [(target, 'fake_info')]
|
|
||||||
|
|
||||||
listener = self.driver.listen_for_notifications(
|
|
||||||
targets_and_priorities, None, None, None)._poll_style_listener
|
|
||||||
|
|
||||||
deadline = time.time() + 3
|
|
||||||
received_message = listener.poll(batch_timeout=3)
|
|
||||||
self.assertEqual(0, int(deadline - time.time()))
|
|
||||||
self.assertEqual([], received_message)
|
|
@ -32,7 +32,7 @@ class OptsTestCase(test_utils.BaseTestCase):
|
|||||||
super(OptsTestCase, self).setUp()
|
super(OptsTestCase, self).setUp()
|
||||||
|
|
||||||
def _test_list_opts(self, result):
|
def _test_list_opts(self, result):
|
||||||
self.assertEqual(6, len(result))
|
self.assertEqual(7, len(result))
|
||||||
|
|
||||||
groups = [g for (g, l) in result]
|
groups = [g for (g, l) in result]
|
||||||
self.assertIn(None, groups)
|
self.assertIn(None, groups)
|
||||||
@ -41,6 +41,7 @@ class OptsTestCase(test_utils.BaseTestCase):
|
|||||||
self.assertIn('oslo_messaging_amqp', groups)
|
self.assertIn('oslo_messaging_amqp', groups)
|
||||||
self.assertIn('oslo_messaging_notifications', groups)
|
self.assertIn('oslo_messaging_notifications', groups)
|
||||||
self.assertIn('oslo_messaging_rabbit', groups)
|
self.assertIn('oslo_messaging_rabbit', groups)
|
||||||
|
self.assertIn('oslo_messaging_kafka', groups)
|
||||||
|
|
||||||
opt_names = [o.name for (g, l) in result for o in l]
|
opt_names = [o.name for (g, l) in result for o in l]
|
||||||
self.assertIn('rpc_backend', opt_names)
|
self.assertIn('rpc_backend', opt_names)
|
||||||
|
@ -21,7 +21,12 @@ redis>=2.10.0 # MIT
|
|||||||
pyzmq>=14.3.1 # LGPL+BSD
|
pyzmq>=14.3.1 # LGPL+BSD
|
||||||
|
|
||||||
# for test_impl_kafka
|
# for test_impl_kafka
|
||||||
kafka-python<1.0.0,>=0.9.5 # Apache-2.0
|
# NOTE(sileht) temporary commented since requirements repo cap it to <1.0.0
|
||||||
|
# due to monasca project that have some concern with newer version.
|
||||||
|
# The driver is currently experimental, python-kafka<1.0.0 API have major issue
|
||||||
|
# that can't make the oslo.messaging driver works, so we prefer having a working
|
||||||
|
# driver with a non-synced dep, that the reverse
|
||||||
|
# kafka-python>=1.3.1 # Apache-2.0
|
||||||
|
|
||||||
# when we can require tox>= 1.4, this can go into tox.ini:
|
# when we can require tox>= 1.4, this can go into tox.ini:
|
||||||
# [testenv:cover]
|
# [testenv:cover]
|
||||||
|
@ -27,4 +27,11 @@ pip install -c$localfile openstack-requirements
|
|||||||
edit-constraints $localfile -- $CLIENT_NAME
|
edit-constraints $localfile -- $CLIENT_NAME
|
||||||
|
|
||||||
pip install -c$localfile -U $*
|
pip install -c$localfile -U $*
|
||||||
|
# NOTE(sileht) temporary overrided since requirements repo cap it to <1.0.0
|
||||||
|
# due to monasca project that have some concern with newer version.
|
||||||
|
# The driver is currently experimental, python-kafka<1.0.0 API have major issue
|
||||||
|
# that can't make the oslo.messaging driver works, so we prefer having a working
|
||||||
|
# driver with a non-synced dep, that the reverse
|
||||||
|
pip install -U 'kafka-python>=1.3.1'
|
||||||
|
|
||||||
exit $?
|
exit $?
|
||||||
|
1
tox.ini
1
tox.ini
@ -58,6 +58,7 @@ commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args=
|
|||||||
setenv =
|
setenv =
|
||||||
{[testenv]setenv}
|
{[testenv]setenv}
|
||||||
TRANSPORT_DRIVER=kafka
|
TRANSPORT_DRIVER=kafka
|
||||||
|
kafka-python>=1.3.1
|
||||||
commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
|
commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
|
||||||
|
|
||||||
[testenv:py27-func-amqp1]
|
[testenv:py27-func-amqp1]
|
||||||
|
Loading…
Reference in New Issue
Block a user