Fix KafkaClient->SimpleClient references

This commit is contained in:
Dana Powers
2016-01-07 17:57:24 -08:00
parent d4e85ecd1d
commit 9a8af1499c
9 changed files with 49 additions and 49 deletions

View File

@@ -94,7 +94,7 @@ class Consumer(object):
def fetch_last_known_offsets(self, partitions=None): def fetch_last_known_offsets(self, partitions=None):
if self.group is None: if self.group is None:
raise ValueError('KafkaClient.group must not be None') raise ValueError('SimpleClient.group must not be None')
if partitions is None: if partitions is None:
partitions = self.client.get_partition_ids_for_topic(self.topic) partitions = self.client.get_partition_ids_for_topic(self.topic)

View File

@@ -9,7 +9,7 @@ import time
import six import six
from kafka.client import KafkaClient from kafka import SimpleClient
from kafka.common import ( from kafka.common import (
OffsetFetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, OffsetCommitRequestPayload,
OffsetRequestPayload, FetchRequestPayload, OffsetRequestPayload, FetchRequestPayload,
@@ -136,7 +136,7 @@ class KafkaConsumer(object):
'bootstrap_servers required to configure KafkaConsumer' 'bootstrap_servers required to configure KafkaConsumer'
) )
self._client = KafkaClient( self._client = SimpleClient(
self._config['bootstrap_servers'], self._config['bootstrap_servers'],
client_id=self._config['client_id'], client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0) timeout=(self._config['socket_timeout_ms'] / 1000.0)

View File

@@ -102,7 +102,7 @@ class MultiProcessConsumer(Consumer):
parallel using multiple processes parallel using multiple processes
Arguments: Arguments:
client: a connected KafkaClient client: a connected SimpleClient
group: a name for this consumer, used for offset storage and must be unique group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None commit/fetch (any prior to 0.8.1.1), then you *must* set this to None

View File

@@ -70,7 +70,7 @@ class SimpleConsumer(Consumer):
for a topic for a topic
Arguments: Arguments:
client: a connected KafkaClient client: a connected SimpleClient
group: a name for this consumer, used for offset storage and must be unique group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None commit/fetch (any prior to 0.8.1.1), then you *must* set this to None

View File

@@ -5,7 +5,7 @@ from mock import ANY, MagicMock, patch
import six import six
from . import unittest from . import unittest
from kafka import KafkaClient from kafka import SimpleClient
from kafka.common import ( from kafka.common import (
ProduceRequestPayload, ProduceRequestPayload,
BrokerMetadata, BrokerMetadata,
@@ -35,33 +35,33 @@ def mock_conn(conn, success=True):
conn.return_value = mocked conn.return_value = mocked
class TestKafkaClient(unittest.TestCase): class TestSimpleClient(unittest.TestCase):
def test_init_with_list(self): def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
self.assertEqual( self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]), sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts)) sorted(client.hosts))
def test_init_with_csv(self): def test_init_with_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092') client = SimpleClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092')
self.assertEqual( self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]), sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts)) sorted(client.hosts))
def test_init_with_unicode_csv(self): def test_init_with_unicode_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'load_metadata_for_topics'):
client = KafkaClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') client = SimpleClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
self.assertEqual( self.assertEqual(
sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]), sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
sorted(client.hosts)) sorted(client.hosts))
@patch.object(KafkaClient, '_get_conn') @patch.object(SimpleClient, '_get_conn')
@patch.object(KafkaClient, 'load_metadata_for_topics') @patch.object(SimpleClient, 'load_metadata_for_topics')
def test_send_broker_unaware_request_fail(self, load_metadata, conn): def test_send_broker_unaware_request_fail(self, load_metadata, conn):
mocked_conns = { mocked_conns = {
('kafka01', 9092): MagicMock(), ('kafka01', 9092): MagicMock(),
@@ -74,7 +74,7 @@ class TestKafkaClient(unittest.TestCase):
return mocked_conns[(host, port)] return mocked_conns[(host, port)]
conn.side_effect = mock_get_conn conn.side_effect = mock_get_conn
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092'])
req = KafkaProtocol.encode_metadata_request() req = KafkaProtocol.encode_metadata_request()
with self.assertRaises(KafkaUnavailableError): with self.assertRaises(KafkaUnavailableError):
@@ -102,10 +102,10 @@ class TestKafkaClient(unittest.TestCase):
return mocked_conns[(host, port)] return mocked_conns[(host, port)]
# patch to avoid making requests before we want it # patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): with patch.object(SimpleClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092') client = SimpleClient(hosts='kafka01:9092,kafka02:9092')
resp = client._send_broker_unaware_request(payloads=['fake request'], resp = client._send_broker_unaware_request(payloads=['fake request'],
encoder_fn=MagicMock(), encoder_fn=MagicMock(),
decoder_fn=lambda x: x) decoder_fn=lambda x: x)
@@ -113,7 +113,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEqual('valid response', resp) self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_once_with() mocked_conns[('kafka02', 9092)].recv.assert_called_once_with()
@patch('kafka.client.KafkaClient._get_conn') @patch('kafka.SimpleClient._get_conn')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn): def test_load_metadata(self, protocol, conn):
@@ -143,7 +143,7 @@ class TestKafkaClient(unittest.TestCase):
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
# client loads metadata at init # client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
self.assertDictEqual({ self.assertDictEqual({
TopicPartition('topic_1', 0): brokers[1], TopicPartition('topic_1', 0): brokers[1],
TopicPartition('topic_noleader', 0): None, TopicPartition('topic_noleader', 0): None,
@@ -163,7 +163,7 @@ class TestKafkaClient(unittest.TestCase):
# This should not raise # This should not raise
client.load_metadata_for_topics('topic_no_leader') client.load_metadata_for_topics('topic_no_leader')
@patch('kafka.client.KafkaClient._get_conn') @patch('kafka.SimpleClient._get_conn')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_has_metadata_for_topic(self, protocol, conn): def test_has_metadata_for_topic(self, protocol, conn):
@@ -184,7 +184,7 @@ class TestKafkaClient(unittest.TestCase):
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
# Topics with no partitions return False # Topics with no partitions return False
self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
@@ -193,7 +193,7 @@ class TestKafkaClient(unittest.TestCase):
# Topic with partition metadata, but no leaders return True # Topic with partition metadata, but no leaders return True
self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
@patch('kafka.client.KafkaClient._get_conn') @patch('kafka.SimpleClient._get_conn')
@patch('kafka.client.KafkaProtocol.decode_metadata_response') @patch('kafka.client.KafkaProtocol.decode_metadata_response')
def test_ensure_topic_exists(self, decode_metadata_response, conn): def test_ensure_topic_exists(self, decode_metadata_response, conn):
@@ -214,7 +214,7 @@ class TestKafkaClient(unittest.TestCase):
] ]
decode_metadata_response.return_value = MetadataResponse(brokers, topics) decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
with self.assertRaises(UnknownTopicOrPartitionError): with self.assertRaises(UnknownTopicOrPartitionError):
client.ensure_topic_exists('topic_doesnt_exist', timeout=1) client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
@@ -225,7 +225,7 @@ class TestKafkaClient(unittest.TestCase):
# This should not raise # This should not raise
client.ensure_topic_exists('topic_noleaders', timeout=1) client.ensure_topic_exists('topic_noleaders', timeout=1)
@patch('kafka.client.KafkaClient._get_conn') @patch('kafka.SimpleClient._get_conn')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
"Get leader for partitions reload metadata if it is not available" "Get leader for partitions reload metadata if it is not available"
@@ -242,7 +242,7 @@ class TestKafkaClient(unittest.TestCase):
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
# topic metadata is loaded but empty # topic metadata is loaded but empty
self.assertDictEqual({}, client.topics_to_brokers) self.assertDictEqual({}, client.topics_to_brokers)
@@ -263,7 +263,7 @@ class TestKafkaClient(unittest.TestCase):
TopicPartition('topic_one_partition', 0): brokers[0]}, TopicPartition('topic_one_partition', 0): brokers[0]},
client.topics_to_brokers) client.topics_to_brokers)
@patch('kafka.client.KafkaClient._get_conn') @patch('kafka.SimpleClient._get_conn')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_get_leader_for_unassigned_partitions(self, protocol, conn): def test_get_leader_for_unassigned_partitions(self, protocol, conn):
@@ -280,7 +280,7 @@ class TestKafkaClient(unittest.TestCase):
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
self.assertDictEqual({}, client.topics_to_brokers) self.assertDictEqual({}, client.topics_to_brokers)
@@ -290,7 +290,7 @@ class TestKafkaClient(unittest.TestCase):
with self.assertRaises(UnknownTopicOrPartitionError): with self.assertRaises(UnknownTopicOrPartitionError):
client._get_leader_for_partition('topic_unknown', 0) client._get_leader_for_partition('topic_unknown', 0)
@patch('kafka.client.KafkaClient._get_conn') @patch('kafka.SimpleClient._get_conn')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_get_leader_exceptions_when_noleader(self, protocol, conn): def test_get_leader_exceptions_when_noleader(self, protocol, conn):
@@ -309,7 +309,7 @@ class TestKafkaClient(unittest.TestCase):
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
self.assertDictEqual( self.assertDictEqual(
{ {
TopicPartition('topic_noleader', 0): None, TopicPartition('topic_noleader', 0): None,
@@ -337,7 +337,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@patch.object(KafkaClient, '_get_conn') @patch.object(SimpleClient, '_get_conn')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_send_produce_request_raises_when_noleader(self, protocol, conn): def test_send_produce_request_raises_when_noleader(self, protocol, conn):
mock_conn(conn) mock_conn(conn)
@@ -355,7 +355,7 @@ class TestKafkaClient(unittest.TestCase):
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
requests = [ProduceRequestPayload( requests = [ProduceRequestPayload(
"topic_noleader", 0, "topic_noleader", 0,
@@ -364,7 +364,7 @@ class TestKafkaClient(unittest.TestCase):
with self.assertRaises(LeaderNotAvailableError): with self.assertRaises(LeaderNotAvailableError):
client.send_produce_request(requests) client.send_produce_request(requests)
@patch('kafka.client.KafkaClient._get_conn') @patch('kafka.SimpleClient._get_conn')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn): def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
@@ -380,7 +380,7 @@ class TestKafkaClient(unittest.TestCase):
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = SimpleClient(hosts=['broker_1:4567'])
requests = [ProduceRequestPayload( requests = [ProduceRequestPayload(
"topic_doesnt_exist", 0, "topic_doesnt_exist", 0,
@@ -403,9 +403,9 @@ class TestKafkaClient(unittest.TestCase):
self.assertGreaterEqual(t.interval, 1.0) self.assertGreaterEqual(t.interval, 1.0)
def test_correlation_rollover(self): def test_correlation_rollover(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'load_metadata_for_topics'):
big_num = 2**31 - 3 big_num = 2**31 - 3
client = KafkaClient(hosts=[], correlation_id=big_num) client = SimpleClient(hosts=[], correlation_id=big_num)
self.assertEqual(big_num + 1, client._next_id()) self.assertEqual(big_num + 1, client._next_id())
self.assertEqual(big_num + 2, client._next_id()) self.assertEqual(big_num + 2, client._next_id())
self.assertEqual(0, client._next_id()) self.assertEqual(0, client._next_id())

View File

@@ -7,7 +7,7 @@ import time
import pytest import pytest
import six import six
from kafka import KafkaClient, SimpleProducer from kafka import SimpleClient, SimpleProducer
from kafka.common import TopicPartition from kafka.common import TopicPartition
from kafka.conn import BrokerConnection, ConnectionStates from kafka.conn import BrokerConnection, ConnectionStates
from kafka.consumer.group import KafkaConsumer from kafka.consumer.group import KafkaConsumer
@@ -47,7 +47,7 @@ def kafka_broker(version, zookeeper, request):
@pytest.fixture @pytest.fixture
def simple_client(kafka_broker): def simple_client(kafka_broker):
connect_str = 'localhost:' + str(kafka_broker.port) connect_str = 'localhost:' + str(kafka_broker.port)
return KafkaClient(connect_str) return SimpleClient(connect_str)
@pytest.fixture @pytest.fixture

View File

@@ -2,7 +2,7 @@ import logging
import os import os
import time import time
from kafka import KafkaClient, SimpleConsumer, KeyedProducer from kafka import SimpleClient, SimpleConsumer, KeyedProducer
from kafka.common import ( from kafka.common import (
TopicPartition, FailedPayloadsError, ConnectionError, RequestTimedOutError TopicPartition, FailedPayloadsError, ConnectionError, RequestTimedOutError
) )
@@ -34,7 +34,7 @@ class TestFailover(KafkaIntegrationTestCase):
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
self.client = KafkaClient(hosts, timeout=2) self.client = SimpleClient(hosts, timeout=2)
super(TestFailover, self).setUp() super(TestFailover, self).setUp()
def tearDown(self): def tearDown(self):
@@ -214,7 +214,7 @@ class TestFailover(KafkaIntegrationTestCase):
hosts = ','.join(['%s:%d' % (broker.host, broker.port) hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers]) for broker in self.brokers])
client = KafkaClient(hosts) client = SimpleClient(hosts)
consumer = SimpleConsumer(client, None, topic, consumer = SimpleConsumer(client, None, topic,
partitions=partitions, partitions=partitions,
auto_commit=False, auto_commit=False,

View File

@@ -8,7 +8,7 @@ import time
from mock import MagicMock, patch from mock import MagicMock, patch
from . import unittest from . import unittest
from kafka import KafkaClient, SimpleProducer, KeyedProducer from kafka import SimpleClient, SimpleProducer, KeyedProducer
from kafka.common import ( from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponsePayload, RetryOptions, TopicPartition ProduceResponsePayload, RetryOptions, TopicPartition
@@ -89,11 +89,11 @@ class TestKafkaProducer(unittest.TestCase):
def test_producer_sync_fail_on_error(self): def test_producer_sync_fail_on_error(self):
error = FailedPayloadsError('failure') error = FailedPayloadsError('failure')
with patch.object(KafkaClient, 'load_metadata_for_topics'): with patch.object(SimpleClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]): with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]): with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
client = KafkaClient(MagicMock()) client = SimpleClient(MagicMock())
producer = SimpleProducer(client, async=False, sync_fail_on_error=False) producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
# This should not raise # This should not raise

View File

@@ -10,7 +10,7 @@ import uuid
from six.moves import xrange from six.moves import xrange
from . import unittest from . import unittest
from kafka import KafkaClient from kafka import SimpleClient
from kafka.common import OffsetRequestPayload from kafka.common import OffsetRequestPayload
__all__ = [ __all__ = [
@@ -62,7 +62,7 @@ class KafkaIntegrationTestCase(unittest.TestCase):
self.topic = topic self.topic = topic
if self.create_client: if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port))
self.client.ensure_topic_exists(self.topic) self.client.ensure_topic_exists(self.topic)