When the message is large, in order to improve the efficiency of
kafka, we need to compress the message before send it, so we need to
support kafka message compression.
Change-Id: I9e86d43ad934c1f82dc3dcf93d317538f9d2568e
Implements: blueprint support-kafka-compression
This patch switches the kafka python client from kafka-python to
confluent-kafka due to documented threading issues with the
kafka-python consumer and the recommendation to use multiplrocessing.
The confluent-kafka client leverages the high performance librdkafka
C client and is safe for multiple thread use.
This patch:
* switches to confluent-kafka library
* revises consumer and producer message operations
* utilizes event.tpool method for confluent-kafka blocking calls
* updates unit tests
* adds kafka specific timeouts for functional tests
* adds release note
Depends-On: Ice374dca539b8ed1b1965b75379bad5140121483
Change-Id: Idfb9fe3700d882c8285c6dc56b0620951178eba2
A change to the global-requirements has limited use of monotonic
library to Python versions earlier than 3.3 (later versions have
built-in support for a monotonic clock), so limit it in
requirements.txt.
Note: this patch updates kafka driver (due to deprecated exception
in library) in order to pass unit tests
Change-Id: Id6b0814e05a0e548a8c2a5359daf1a6878cf6859
The Kafka driver deprecated the kafka_default_host and
kafka_default_port options in 5.10.0, released in Ocata. Remove them.
Change-Id: I206e68ec1624bb6d5d6ba320572530352bbd4378
This adds an optional call_monitor_timeout parameter to the RPC client,
which if specified, will enable heartbeating of long-running calls by
the server. This enables the user to increase the regular timeout to
a much larger value, allowing calls to take a very long time, but
with heartbeating to indicate that they are still running on the server
side. If the server stops heartbeating, then the call_monitor_timeout
takes over and we fail with the usual MessagingTimeout instead of waiting
for the longer overall timeout to expire.
Change-Id: I60334aaf019f177a984583528b71d00859d31f84
This adds a heartbeat() method to RpcIncomingMessage to be used by a
subsequent patch implementation of active-call heartbeating. This is
unimplemented in all drivers for the moment.
Change-Id: If8ab0dc16e3bef69d5a826c31c0fe35e403ac6a1
This patch changes the default driver behavior to synchronously
commit messages following consumer poll. A configuration option
will enable the auto commit for asynchronous commit if desired.
Depends-On: I5b4f01c928373cac530aa6877a34c684577bc64e
Change-Id: I92a3dc95c5d424aa722138195fef5a855a66b31d
Emulate vhost support by adding the virtual host name to the
topic created on the kafka server. Also, update connection
management for producer/consumer.
This patch:
* updates target to topic generation
* add consumer and producer connection classes
* remove connection pool
* update driver test
Change-Id: Idd164444c04e9f465a43ee909af840a41bb090c0
This patch addresses a number of issues that prevented the functional
tests from running. The functional tests now execute and can complete
succesfully. At times, the test will fail (noticiably in CI) indicating
an underlying issue with consumer interaction with the kafka server.
It would be beneficial to merge this patch as it provides repeatability
and visibility for driver-kafka server integration to facilitate
additional debugging and testing.
This patch:
* removes use of deprecated get_transport
* override consumer_group for each test
* changed to synchronous send
* update to kafka 1.0.0 server
Depends-On: Ib552152e841a9fc0bffdcb7c3f7bc75613d0ed62
Change-Id: I7009a3b96ee250c177c10f5121eb73d908747a52
ConfigOptsProxy have been implemented only pika driver while
the oslo.messaging allow to pass the query string for all drivers.
This change fixes that.
Closes-Bug: #1666903
Closes-Bug: #1607889
Next step is to validate the query with ConfigOptsProxy, to
raise appropriate exception in case of mis-configuration.
Change-Id: I573334e774ccf33ecd27a85067045f3c6489ee89
consume() must return only if user timeout is reached and not
when driver consumer_timeout is reached.
Change-Id: I6b2b2a28038a194224e79fa37285436ca6787a0a
This producer singleton works only if one transport use kafka.
If multiple transports with kafka are used, their overrides each other
the producer on each send.
This change creates only one producer per connection.
A later optimisation can be one producer per driver instance.
Change-Id: I429f7c5efcb41690dd1b17f856bda2425c788e53
Currently Kafka driver for an oslo.messaging uses kafka-python==0.9.5
and mostly broken. This package version supports only low level Kafka
producer and consumer API which are marked as deprecated now [1]. Using
of these interfaces bring a big concern to the message processing,
because current KafkaConsumer has not any consuming coordination. This
fact causes a message duplication for the several consumers of one
topic. This behavior is specific to Ceilometer and services which read
and process notifications from other services.
New version of kafka-python allows to use async thread safe message
producers and coordinated consumers [1].
[1] http://kafka-python.readthedocs.io/en/master/changelog.html#feb-15-2016
The driver is currently experimental, python-kafka<1.0.0 API have major
issue described above that can't make the oslo.messaging driver works,
so we prefer having a working driver with a non-synced dependencies, that the
reverse.
Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com>
Change-Id: I29862ed7bf56b9d8878fa8e9fb1cbd9d643908a4
is called
The interface of BaseDriver which is a super-class of each underlying
transport driver has been changed.
But the interface of kafka driver doens't follow up this change. So if a
user chose it as a transport driver, an exception of TypeError would be
occurred.
This change corrects the interface in kafka's driver along with the
BaseDriver's one.
Change-Id: Iedd069b7f083e2cbf377f4148411f77ad758f979
Closes-Bug: #1616755
We can reduce a workload of rabbitmq through implementation
of expiration mechanism for idle connections in the pool with
next properties:
conn_pool_ttl (default 20 min)
conn_pool_min_size: the pool size limit for expire() (default 2)
The problem is timeless idle connections in the pool, which can be created
via some single huge workload of RPCServer. One SEND connection is heartbeat
thread + some network activity every n second. So, we can reduce it.
Here is two ways to implement an expiration:
[1] Create a separated thread for checking expire date of connections
[2] Make call expire() on pool.get() or pool.put()
The [1] has some threading overhead, but probably insignificant
because the thread can sleep 99% time and wake up every 20 mins (by default).
Anyway current implementation is [2].
Change-Id: Ie8781d10549a044656824ceb78b2fe2e4f7f8b43
This patch removes log_failure argument from the function
serialize_remote_exception and from driver implementations
using it (because it is never used and always defaults to True)
and prevents error logging in this function (because these errors
are already logged by servers while processing incoming messages).
Change-Id: Ic01bb11d6c4f018a17f3219cdbd07ef4d30fa434
Closes-Bug: 1580352
At that moment kafka driver can use only url with one "host:port"
for the bootstrap server defining, but kafka client supports
set of host:port adresses: "host1:port1,host2:port2", ... .
This patch implement this functional in kafka driver for the better HA.
List self.hostaddrs stores strings "host:port" of Connection.
It collects from self.url.hosts
Change-Id: I5eece66ca6bd069a0df8c8629b4ac815f69a7c7d
Closes-Bug: #1572017
1) Add MessageHandler base interface for on_incoming_callback replacement
2) Move message_handler parameter form Listener's __init__() to start()
3) Remove wait method from listener
Change-Id: Id414446817e3d2ff67b815074d042a9ce637ec24
Current Listener interface has poll() method which return messages
To use it we need have poller thread which is located in MessageHandlerServer
But my investigations of existing driver's code shows that some implemetations have
its own thread inside for processing connection event loop. This event loop received
messages and store in queue object. And then our poller's thread reads this queue
This situation can be improved. we can remove poller's thread, remove queue object
and just call on_message server's callback from connection eventloop thread
This path provide posibility to do this for one of drivers and leave as is other drivers
Change-Id: I3e3d4369d8fdadcecf079d10af58b1e4f5616047
Currently we delete kafka_client, producer and consumer
from the Kafka driver connection when we reset it,
for example before returning it to pool.
It's a redundant action, because kafka_client and
kafka producer (it uses a kafka_client for
sending a message) could be used again without any
concerns.
In same time, currently we don't close a KafkaConsumer,
but it's needed because we should to close opened
sockets to the kafka.
In this patchset all action in reset are changed
to the more optimal behavior.
Change-Id: I6ff26256c933c79468f9e6cd1752181e5ace155f
Closes-bug: #1557528
Consumer in Kafka driver should use only unique topic,
otherwise a FetchDuplicate exception will be raised.
Change-Id: I569ce446eaf05dbc3a7fd0b41a2307e940ab87fb
Closes-bug: #1555081
Currently we trying ot fetch messages from the topics even
they have bot been created yet. This behaviour causes a
KafkaConfigurationError which are raised in the kafka driver.
Change-Id: I78cfd5ac24fbf37be5649232d0bc825319cf6402
Closes-bug: #1557521
This reverts commit bd81d09c02c5bc8561ad04de91802a5c1917d9e9.
I understand that the change was supposed to fix something, but instead it broke all tests on Python 3!?
It's wrong to replace blindly json.dumps() with jsonutils.dump_as_bytes(). In oslo messaging, the result is usually used as a value in a dictionary, and then the whole dictionary is passed to a second serializer which also serialize to JSON.
Sorry, I don't understand everything, but at least I see that tests passed on py3 before the change, and started to fail with the change.
Maybe json(utils).dumps() is misused in some places, but in this case, you should write a change which only fix these specific places, not replace all calls to dumps().
Change-Id: Icd54ee8e3f5c976dfd50b4b62c7f51288649e112
Notification priority actually points to an endpoint method
to be called and couldn't be ignored by the driver.
Also reduced exchange because it shouldn't take part in
final topic resolution.
Closes-Bug #1546081
Change-Id: I4a7c367d91c2fefc6830f0ab3cdcbc0b605127b0
The oslo.config lib includes a PortOpt to simply restrict the
range of integers available for a port type option. This patch
makes use of that.
Change-Id: I8cda92d6c7555b6cb788627d072174ad608fb6de
Since the json.dumps() returns Unicode string On Python 3, to
ensure that the result type is bytes on Python 2 and Python 3
that if the result is used for the message body, let's replace
json.dumps() / oslo_serialization.jsonutils.dumps()
with
oslo_serialization.jsoutils.dump_as_bytes()
Change-Id: I0e0f6b715ffc4a9ad82be52e55696d032b6d0976