Add a driver for Apache Kafka
Adding a driver for Apache Kafka connection, supporting notification via Kafka. This driver is experimental until having functional and integration tests Change-Id: I7a5d8e3259b21d5e29ed3b795d04952e1d13ad08 Implements: blueprint adding-kafka-support
This commit is contained in:
parent
33c1010c32
commit
67c63031f5
363
oslo_messaging/_drivers/impl_kafka.py
Normal file
363
oslo_messaging/_drivers/impl_kafka.py
Normal file
@ -0,0 +1,363 @@
|
||||
# Copyright (C) 2015 Cisco Systems, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import threading
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as driver_common
|
||||
from oslo_messaging._drivers import pool as driver_pool
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging._i18n import _LW
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
import kafka
|
||||
from kafka.common import KafkaError
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
PURPOSE_SEND = 'send'
|
||||
PURPOSE_LISTEN = 'listen'
|
||||
|
||||
kafka_opts = [
|
||||
cfg.StrOpt('kafka_default_host', default='localhost',
|
||||
help='Default Kafka broker Host'),
|
||||
|
||||
cfg.IntOpt('kafka_default_port', default=9092,
|
||||
help='Default Kafka broker Port'),
|
||||
|
||||
cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024,
|
||||
help='Max fetch bytes of Kafka consumer'),
|
||||
|
||||
cfg.IntOpt('kafka_consumer_timeout', default=1.0,
|
||||
help='Default timeout(s) for Kafka consumers'),
|
||||
|
||||
cfg.IntOpt('pool_size', default=10,
|
||||
help='Pool Size for Kafka Consumers'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def pack_context_with_message(ctxt, msg):
|
||||
"""Pack context into msg."""
|
||||
if isinstance(ctxt, dict):
|
||||
context_d = ctxt
|
||||
else:
|
||||
context_d = ctxt.to_dict()
|
||||
|
||||
return {'message': msg, 'context': context_d}
|
||||
|
||||
|
||||
def target_to_topic(target):
|
||||
"""Convert target into topic string
|
||||
|
||||
:param target: Message destination target
|
||||
:type target: oslo_messaging.Target
|
||||
"""
|
||||
if target.exchange is None:
|
||||
return target.topic
|
||||
return "%s_%s" % (target.exchange, target.topic)
|
||||
|
||||
|
||||
class Connection(object):
|
||||
|
||||
def __init__(self, conf, url, purpose):
|
||||
|
||||
driver_conf = conf.oslo_messaging_kafka
|
||||
|
||||
self.conf = conf
|
||||
self.kafka_client = None
|
||||
self.producer = None
|
||||
self.consumer = None
|
||||
self.fetch_messages_max_bytes = driver_conf.kafka_max_fetch_bytes
|
||||
self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
|
||||
self.url = url
|
||||
self._parse_url()
|
||||
# TODO(Support for manual/auto_commit functionality)
|
||||
# When auto_commit is False, consumer can manually notify
|
||||
# the completion of the subscription.
|
||||
# Currently we don't support for non auto commit option
|
||||
self.auto_commit = True
|
||||
self._consume_loop_stopped = False
|
||||
|
||||
def _parse_url(self):
|
||||
driver_conf = self.conf.oslo_messaging_kafka
|
||||
try:
|
||||
self.host = self.url.hosts[0].hostname
|
||||
except (NameError, IndexError):
|
||||
self.host = driver_conf.kafka_default_host
|
||||
|
||||
try:
|
||||
self.port = self.url.hosts[0].port
|
||||
except (NameError, IndexError):
|
||||
self.port = driver_conf.kafka_default_port
|
||||
|
||||
if self.host is None:
|
||||
self.host = driver_conf.kafka_default_host
|
||||
|
||||
if self.port is None:
|
||||
self.port = driver_conf.kafka_default_port
|
||||
|
||||
def notify_send(self, topic, ctxt, msg, retry):
|
||||
"""Send messages to Kafka broker.
|
||||
|
||||
:param topic: String of the topic
|
||||
:param ctxt: context for the messages
|
||||
:param msg: messages for publishing
|
||||
:param retry: the number of retry
|
||||
"""
|
||||
message = pack_context_with_message(ctxt, msg)
|
||||
self._ensure_connection()
|
||||
self._send_and_retry(message, topic, retry)
|
||||
|
||||
def _send_and_retry(self, message, topic, retry):
|
||||
current_retry = 0
|
||||
if not isinstance(message, str):
|
||||
message = jsonutils.dumps(message)
|
||||
while message is not None:
|
||||
try:
|
||||
self._send(message, topic)
|
||||
message = None
|
||||
except Exception:
|
||||
LOG.warn(_LW("Failed to publish a message of topic %s"), topic)
|
||||
current_retry += 1
|
||||
if retry is not None and current_retry >= retry:
|
||||
LOG.exception(_LE("Failed to retry to send data "
|
||||
"with max retry times"))
|
||||
message = None
|
||||
|
||||
def _send(self, message, topic):
|
||||
self.producer.send_messages(topic, message)
|
||||
|
||||
def consume(self, timeout=None):
|
||||
"""recieve messages as many as max_fetch_messages.
|
||||
|
||||
In this functions, there are no while loop to subscribe.
|
||||
This would be helpful when we wants to control the velocity of
|
||||
subscription.
|
||||
"""
|
||||
duration = (self.consumer_timeout if timeout is None else timeout)
|
||||
timer = driver_common.DecayingTimer(duration=duration)
|
||||
timer.start()
|
||||
|
||||
def _raise_timeout():
|
||||
LOG.debug('Timed out waiting for Kafka response')
|
||||
raise driver_common.Timeout()
|
||||
|
||||
poll_timeout = (self.consumer_timeout if timeout is None
|
||||
else min(timeout, self.consumer_timeout))
|
||||
|
||||
while True:
|
||||
if self._consume_loop_stopped:
|
||||
return
|
||||
try:
|
||||
next_timeout = poll_timeout * 1000.0
|
||||
# TODO(use configure() method instead)
|
||||
# Currently KafkaConsumer does not support for
|
||||
# the case of updating only fetch_max_wait_ms parameter
|
||||
self.consumer._config['fetch_max_wait_ms'] = next_timeout
|
||||
messages = list(self.consumer.fetch_messages())
|
||||
except Exception as e:
|
||||
LOG.exception(_LE("Failed to consume messages: %s"), e)
|
||||
messages = None
|
||||
|
||||
if not messages:
|
||||
poll_timeout = timer.check_return(
|
||||
_raise_timeout, maximum=self.consumer_timeout)
|
||||
continue
|
||||
|
||||
return messages
|
||||
|
||||
def stop_consuming(self):
|
||||
self._consume_loop_stopped = True
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again."""
|
||||
if self.kafka_client:
|
||||
self.kafka_client.close()
|
||||
self.kafka_client = None
|
||||
if self.producer:
|
||||
self.producer.stop()
|
||||
self.producer = None
|
||||
self.consumer = None
|
||||
|
||||
def close(self):
|
||||
if self.kafka_client:
|
||||
self.kafka_client.close()
|
||||
self.kafka_client = None
|
||||
if self.producer:
|
||||
self.producer.stop()
|
||||
self.consumer = None
|
||||
|
||||
def commit(self):
|
||||
"""Commit is used by subscribers belonging to the same group.
|
||||
After subscribing messages, commit is called to prevent
|
||||
the other subscribers which belong to the same group
|
||||
from re-subscribing the same messages.
|
||||
|
||||
Currently self.auto_commit option is always True,
|
||||
so we don't need to call this function.
|
||||
"""
|
||||
self.consumer.commit()
|
||||
|
||||
def _ensure_connection(self):
|
||||
if self.kafka_client:
|
||||
return
|
||||
try:
|
||||
self.kafka_client = kafka.KafkaClient(
|
||||
"%s:%s" % (self.host, str(self.port)))
|
||||
self.producer = kafka.SimpleProducer(self.kafka_client)
|
||||
except KafkaError as e:
|
||||
LOG.exception(_LE("Kafka Connection is not available: %s"), e)
|
||||
self.kafka_client = None
|
||||
|
||||
def declare_topic_consumer(self, topics, group=None):
|
||||
self.consumer = kafka.KafkaConsumer(
|
||||
*topics, group_id=group,
|
||||
metadata_broker_list=["%s:%s" % (self.host, str(self.port))],
|
||||
# auto_commit_enable=self.auto_commit,
|
||||
fetch_message_max_bytes=self.fetch_messages_max_bytes)
|
||||
|
||||
|
||||
class OsloKafkaMessage(base.IncomingMessage):
|
||||
|
||||
def __init__(self, listener, ctxt, message):
|
||||
super(OsloKafkaMessage, self).__init__(listener, ctxt, message)
|
||||
|
||||
def requeue(self):
|
||||
LOG.warn(_LW("requeue is not supported"))
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
LOG.warn(_LW("reply is not supported"))
|
||||
|
||||
|
||||
class KafkaListener(base.Listener):
|
||||
|
||||
def __init__(self, driver, conn):
|
||||
super(KafkaListener, self).__init__(driver)
|
||||
self._stopped = threading.Event()
|
||||
self.conn = conn
|
||||
self.incoming_queue = []
|
||||
|
||||
def poll(self, timeout=None):
|
||||
while not self._stopped.is_set():
|
||||
if self.incoming_queue:
|
||||
return self.incoming_queue.pop(0)
|
||||
try:
|
||||
messages = self.conn.consume(timeout=timeout)
|
||||
for msg in messages:
|
||||
message = msg.value
|
||||
message = jsonutils.loads(message)
|
||||
self.incoming_queue.append(OsloKafkaMessage(
|
||||
listener=self, ctxt=message['context'],
|
||||
message=message['message']))
|
||||
except driver_common.Timeout:
|
||||
return None
|
||||
|
||||
def stop(self):
|
||||
self._stopped.set()
|
||||
self.conn.stop_consuming()
|
||||
|
||||
def cleanup(self):
|
||||
self.conn.close()
|
||||
|
||||
def commit(self):
|
||||
# TODO(Support for manually/auto commit functionality)
|
||||
# It's better to allow users to commit manually and support for
|
||||
# self.auto_commit = False option. For now, this commit function
|
||||
# is meaningless since user couldn't call this function and
|
||||
# auto_commit option is always True.
|
||||
self.conn.commit()
|
||||
|
||||
|
||||
class KafkaDriver(base.BaseDriver):
|
||||
"""Note: Current implementation of this driver is experimental.
|
||||
We will have functional and/or integrated testing enabled for this driver.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, url, default_exchange=None,
|
||||
allowed_remote_exmods=None):
|
||||
|
||||
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
||||
title='Kafka driver options')
|
||||
conf.register_group(opt_group)
|
||||
conf.register_opts(kafka_opts, group=opt_group)
|
||||
|
||||
super(KafkaDriver, self).__init__(
|
||||
conf, url, default_exchange, allowed_remote_exmods)
|
||||
|
||||
self.connection_pool = driver_pool.ConnectionPool(
|
||||
self.conf, self.conf.oslo_messaging_kafka.pool_size,
|
||||
self._url, Connection)
|
||||
self.listeners = []
|
||||
|
||||
def cleanup(self):
|
||||
for c in self.listeners:
|
||||
c.close()
|
||||
self.listeners = []
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
raise NotImplementedError(
|
||||
'The RPC implementation for Kafka is not implemented')
|
||||
|
||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||
"""Send notification to Kafka brokers
|
||||
|
||||
:param target: Message destination target
|
||||
:type target: oslo_messaging.Target
|
||||
:param ctxt: Message context
|
||||
:type ctxt: dict
|
||||
:param message: Message payload to pass
|
||||
:type message: dict
|
||||
:param version: Messaging API version (currently not used)
|
||||
:type version: str
|
||||
:param retry: an optional default kafka consumer retries configuration
|
||||
None means to retry forever
|
||||
0 means no retry
|
||||
N means N retries
|
||||
:type retry: int
|
||||
"""
|
||||
with self._get_connection(purpose=PURPOSE_SEND) as conn:
|
||||
conn.notify_send(target_to_topic(target), ctxt, message, retry)
|
||||
|
||||
def listen(self, target):
|
||||
raise NotImplementedError(
|
||||
'The RPC implementation for Kafka is not implemented')
|
||||
|
||||
def listen_for_notifications(self, targets_and_priorities, pool=None):
|
||||
"""Listen to a specified list of targets on Kafka brokers
|
||||
|
||||
:param targets_and_priorities: List of pairs (target, priority)
|
||||
priority is not used for kafka driver
|
||||
target.exchange_target.topic is used as
|
||||
a kafka topic
|
||||
:type targets_and_priorities: list
|
||||
:param pool: consumer group of Kafka consumers
|
||||
:type pool: string
|
||||
"""
|
||||
conn = self._get_connection(purpose=PURPOSE_LISTEN)
|
||||
topics = []
|
||||
for target, priority in targets_and_priorities:
|
||||
topics.append(target_to_topic(target))
|
||||
|
||||
conn.declare_topic_consumer(topics, pool)
|
||||
|
||||
listener = KafkaListener(self, conn)
|
||||
return listener
|
||||
|
||||
def _get_connection(self, purpose):
|
||||
return driver_common.ConnectionContext(self.connection_pool, purpose)
|
288
oslo_messaging/tests/drivers/test_impl_kafka.py
Normal file
288
oslo_messaging/tests/drivers/test_impl_kafka.py
Normal file
@ -0,0 +1,288 @@
|
||||
# Copyright (C) 2015 Cisco Systems, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import json
|
||||
import kafka
|
||||
from kafka.common import KafkaError
|
||||
import mock
|
||||
import testscenarios
|
||||
from testtools.testcase import unittest
|
||||
import time
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import common as driver_common
|
||||
from oslo_messaging._drivers import impl_kafka as kafka_driver
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
KAFKA_BROKER = 'localhost:9092'
|
||||
KAFKA_BROKER_URL = 'kafka://localhost:9092'
|
||||
|
||||
|
||||
def _is_kafka_service_running():
|
||||
"""Checks whether the Kafka service is running or not"""
|
||||
kafka_running = True
|
||||
try:
|
||||
broker = KAFKA_BROKER
|
||||
kafka.KafkaClient(broker)
|
||||
except KafkaError:
|
||||
# Kafka service is not running.
|
||||
kafka_running = False
|
||||
return kafka_running
|
||||
|
||||
|
||||
class TestKafkaDriverLoad(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestKafkaDriverLoad, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'kafka'
|
||||
|
||||
def test_driver_load(self):
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver)
|
||||
|
||||
|
||||
class TestKafkaTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
('none', dict(url=None,
|
||||
expected=[dict(host='localhost', port=9092)])),
|
||||
('empty', dict(url='kafka:///',
|
||||
expected=[dict(host='localhost', port=9092)])),
|
||||
('host', dict(url='kafka://127.0.0.1',
|
||||
expected=[dict(host='127.0.0.1', port=9092)])),
|
||||
('port', dict(url='kafka://localhost:1234',
|
||||
expected=[dict(host='localhost', port=1234)])),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestKafkaTransportURL, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'kafka'
|
||||
|
||||
def test_transport_url(self):
|
||||
transport = oslo_messaging.get_transport(self.conf, self.url)
|
||||
self.addCleanup(transport.cleanup)
|
||||
driver = transport._driver
|
||||
|
||||
conn = driver._get_connection(kafka_driver.PURPOSE_SEND)
|
||||
self.assertEqual(self.expected[0]['host'], conn.host)
|
||||
self.assertEqual(self.expected[0]['port'], conn.port)
|
||||
|
||||
|
||||
class TestKafkaDriver(test_utils.BaseTestCase):
|
||||
"""Unit Test cases to test the kafka driver
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestKafkaDriver, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'kafka'
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
def test_send(self):
|
||||
target = oslo_messaging.Target(topic="topic_test")
|
||||
self.assertRaises(NotImplementedError,
|
||||
self.driver.send, target, {}, {})
|
||||
|
||||
def test_send_notification(self):
|
||||
target = oslo_messaging.Target(topic="topic_test")
|
||||
|
||||
with mock.patch.object(
|
||||
kafka_driver.Connection, 'notify_send') as fake_send:
|
||||
self.driver.send_notification(target, {}, {}, None)
|
||||
self.assertEquals(1, len(fake_send.mock_calls))
|
||||
|
||||
def test_listen(self):
|
||||
target = oslo_messaging.Target(topic="topic_test")
|
||||
self.assertRaises(NotImplementedError, self.driver.listen, target)
|
||||
|
||||
|
||||
class TestKafkaConnection(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestKafkaConnection, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'kafka'
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, '_send')
|
||||
def test_notify(self, fake_send, fake_ensure_connection):
|
||||
conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
|
||||
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
||||
{"fake_text": "fake_message_1"}, 10)
|
||||
self.assertEqual(1, len(fake_send.mock_calls))
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, '_send')
|
||||
def test_notify_with_retry(self, fake_send, fake_ensure_connection):
|
||||
conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
|
||||
fake_send.side_effect = KafkaError("fake_exception")
|
||||
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
|
||||
{"fake_text": "fake_message_2"}, 10)
|
||||
self.assertEqual(10, len(fake_send.mock_calls))
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
||||
def test_consume(self, fake_parse_url, fake_ensure_connection):
|
||||
fake_message = {
|
||||
"context": {"fake": "fake_context_1"},
|
||||
"message": {"fake": "fake_message_1"}}
|
||||
|
||||
conn = kafka_driver.Connection(
|
||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
||||
|
||||
conn.consumer = mock.MagicMock()
|
||||
conn.consumer.fetch_messages = mock.MagicMock(
|
||||
return_value=iter([json.dumps(fake_message)]))
|
||||
|
||||
self.assertEqual(fake_message, json.loads(conn.consume()[0]))
|
||||
self.assertEqual(1, len(conn.consumer.fetch_messages.mock_calls))
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
||||
def test_consume_timeout(self, fake_parse_url, fake_ensure_connection):
|
||||
deadline = time.time() + 3
|
||||
conn = kafka_driver.Connection(
|
||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
||||
|
||||
conn.consumer = mock.MagicMock()
|
||||
conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
|
||||
|
||||
self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
|
||||
self.assertEqual(0, int(deadline - time.time()))
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
||||
def test_consume_with_default_timeout(
|
||||
self, fake_parse_url, fake_ensure_connection):
|
||||
deadline = time.time() + 1
|
||||
conn = kafka_driver.Connection(
|
||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
||||
|
||||
conn.consumer = mock.MagicMock()
|
||||
conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
|
||||
|
||||
self.assertRaises(driver_common.Timeout, conn.consume)
|
||||
self.assertEqual(0, int(deadline - time.time()))
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, '_parse_url')
|
||||
def test_consume_timeout_without_consumers(
|
||||
self, fake_parse_url, fake_ensure_connection):
|
||||
deadline = time.time() + 3
|
||||
conn = kafka_driver.Connection(
|
||||
self.conf, '', kafka_driver.PURPOSE_LISTEN)
|
||||
conn.consumer = mock.MagicMock(return_value=None)
|
||||
|
||||
self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
|
||||
self.assertEqual(0, int(deadline - time.time()))
|
||||
|
||||
|
||||
class TestKafkaListener(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestKafkaListener, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'kafka'
|
||||
transport = oslo_messaging.get_transport(self.conf)
|
||||
self.driver = transport._driver
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
||||
def test_create_listener(self, fake_consumer, fake_ensure_connection):
|
||||
fake_target = oslo_messaging.Target(topic='fake_topic')
|
||||
fake_targets_and_priorities = [(fake_target, 'info')]
|
||||
listener = self.driver.listen_for_notifications(
|
||||
fake_targets_and_priorities)
|
||||
self.assertEqual(1, len(fake_consumer.mock_calls))
|
||||
|
||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
||||
def test_stop_listener(self, fake_consumer, fake_client):
|
||||
fake_target = oslo_messaging.Target(topic='fake_topic')
|
||||
fake_targets_and_priorities = [(fake_target, 'info')]
|
||||
listener = self.driver.listen_for_notifications(
|
||||
fake_targets_and_priorities)
|
||||
listener.conn.consume = mock.MagicMock()
|
||||
listener.conn.consume.return_value = (
|
||||
iter([kafka.common.KafkaMessage(
|
||||
topic='fake_topic', partition=0, offset=0,
|
||||
key=None, value='{"message": {"fake": "fake_message_1"},'
|
||||
'"context": {"fake": "fake_context_1"}}')]))
|
||||
listener.poll()
|
||||
self.assertEqual(1, len(listener.conn.consume.mock_calls))
|
||||
listener.conn.stop_consuming = mock.MagicMock()
|
||||
listener.stop()
|
||||
fake_response = listener.poll()
|
||||
self.assertEqual(1, len(listener.conn.consume.mock_calls))
|
||||
self.assertEqual(fake_response, None)
|
||||
|
||||
|
||||
class TestWithRealKafkaBroker(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestWithRealKafkaBroker, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'kafka'
|
||||
transport = oslo_messaging.get_transport(self.conf, KAFKA_BROKER_URL)
|
||||
self.driver = transport._driver
|
||||
|
||||
@unittest.skipUnless(
|
||||
_is_kafka_service_running(), "Kafka service is not available")
|
||||
def test_send_and_recieve_message(self):
|
||||
target = oslo_messaging.Target(
|
||||
topic="fake_topic", exchange='fake_exchange')
|
||||
targets_and_priorities = [(target, 'fake_info')]
|
||||
|
||||
listener = self.driver.listen_for_notifications(
|
||||
targets_and_priorities)
|
||||
fake_context = {"fake_context_key": "fake_context_value"}
|
||||
fake_message = {"fake_message_key": "fake_message_value"}
|
||||
self.driver.send_notification(
|
||||
target, fake_context, fake_message, None)
|
||||
|
||||
received_message = listener.poll()
|
||||
self.assertEqual(fake_context, received_message.ctxt)
|
||||
self.assertEqual(fake_message, received_message.message)
|
||||
|
||||
@unittest.skipUnless(
|
||||
_is_kafka_service_running(), "Kafka service is not available")
|
||||
def test_send_and_recieve_message_without_exchange(self):
|
||||
target = oslo_messaging.Target(topic="fake_no_exchange_topic")
|
||||
targets_and_priorities = [(target, 'fake_info')]
|
||||
|
||||
listener = self.driver.listen_for_notifications(
|
||||
targets_and_priorities)
|
||||
fake_context = {"fake_context_key": "fake_context_value"}
|
||||
fake_message = {"fake_message_key": "fake_message_value"}
|
||||
self.driver.send_notification(
|
||||
target, fake_context, fake_message, None)
|
||||
|
||||
received_message = listener.poll()
|
||||
self.assertEqual(fake_context, received_message.ctxt)
|
||||
self.assertEqual(fake_message, received_message.message)
|
||||
|
||||
@unittest.skipUnless(
|
||||
_is_kafka_service_running(), "Kafka service is not available")
|
||||
def test_recieve_message_from_empty_topic_with_timeout(self):
|
||||
target = oslo_messaging.Target(
|
||||
topic="fake_empty_topic", exchange='fake_empty_exchange')
|
||||
targets_and_priorities = [(target, 'fake_info')]
|
||||
|
||||
listener = self.driver.listen_for_notifications(
|
||||
targets_and_priorities)
|
||||
|
||||
deadline = time.time() + 3
|
||||
received_message = listener.poll(timeout=3)
|
||||
self.assertEqual(0, int(deadline - time.time()))
|
||||
self.assertEqual(None, received_message)
|
@ -29,6 +29,9 @@ oslo.messaging.drivers =
|
||||
zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver
|
||||
amqp = oslo_messaging._drivers.protocols.amqp.driver:ProtonDriver
|
||||
|
||||
# This driver is supporting for only notification usage
|
||||
kafka = oslo_messaging._drivers.impl_kafka:KafkaDriver
|
||||
|
||||
# To avoid confusion
|
||||
kombu = oslo_messaging._drivers.impl_rabbit:RabbitDriver
|
||||
|
||||
|
@ -21,6 +21,9 @@ redis>=2.10.0
|
||||
# for test_impl_zmq
|
||||
pyzmq>=14.3.1 # LGPL+BSD
|
||||
|
||||
# for test_impl_kafka
|
||||
kafka-python>=0.9.2 # Apache-2.0
|
||||
|
||||
# when we can require tox>= 1.4, this can go into tox.ini:
|
||||
# [testenv:cover]
|
||||
# deps = {[testenv]deps} coverage
|
||||
|
Loading…
Reference in New Issue
Block a user