Merge conflict w/ assertEqual (assertEquals deprecated)
This commit is contained in:
@@ -32,12 +32,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
|
|||||||
fetch = FetchRequest(self.topic, 0, 0, 1024)
|
fetch = FetchRequest(self.topic, 0, 0, 1024)
|
||||||
|
|
||||||
fetch_resp, = self.client.send_fetch_request([fetch])
|
fetch_resp, = self.client.send_fetch_request([fetch])
|
||||||
self.assertEquals(fetch_resp.error, 0)
|
self.assertEqual(fetch_resp.error, 0)
|
||||||
self.assertEquals(fetch_resp.topic, self.topic)
|
self.assertEqual(fetch_resp.topic, self.topic)
|
||||||
self.assertEquals(fetch_resp.partition, 0)
|
self.assertEqual(fetch_resp.partition, 0)
|
||||||
|
|
||||||
messages = list(fetch_resp.messages)
|
messages = list(fetch_resp.messages)
|
||||||
self.assertEquals(len(messages), 0)
|
self.assertEqual(len(messages), 0)
|
||||||
|
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_ensure_topic_exists(self):
|
def test_ensure_topic_exists(self):
|
||||||
@@ -58,10 +58,10 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
|
|||||||
def test_commit_fetch_offsets(self):
|
def test_commit_fetch_offsets(self):
|
||||||
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
|
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
|
||||||
(resp,) = self.client.send_offset_commit_request(b"group", [req])
|
(resp,) = self.client.send_offset_commit_request(b"group", [req])
|
||||||
self.assertEquals(resp.error, 0)
|
self.assertEqual(resp.error, 0)
|
||||||
|
|
||||||
req = OffsetFetchRequest(self.topic, 0)
|
req = OffsetFetchRequest(self.topic, 0)
|
||||||
(resp,) = self.client.send_offset_fetch_request(b"group", [req])
|
(resp,) = self.client.send_offset_fetch_request(b"group", [req])
|
||||||
self.assertEquals(resp.error, 0)
|
self.assertEqual(resp.error, 0)
|
||||||
self.assertEquals(resp.offset, 42)
|
self.assertEqual(resp.offset, 42)
|
||||||
self.assertEquals(resp.metadata, b"") # Metadata isn't stored for now
|
self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now
|
||||||
|
@@ -15,14 +15,14 @@ class TestCodec(unittest.TestCase):
|
|||||||
for i in xrange(1000):
|
for i in xrange(1000):
|
||||||
s1 = random_string(100)
|
s1 = random_string(100)
|
||||||
s2 = gzip_decode(gzip_encode(s1))
|
s2 = gzip_decode(gzip_encode(s1))
|
||||||
self.assertEquals(s1, s2)
|
self.assertEqual(s1, s2)
|
||||||
|
|
||||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||||
def test_snappy(self):
|
def test_snappy(self):
|
||||||
for i in xrange(1000):
|
for i in xrange(1000):
|
||||||
s1 = random_string(100)
|
s1 = random_string(100)
|
||||||
s2 = snappy_decode(snappy_encode(s1))
|
s2 = snappy_decode(snappy_encode(s1))
|
||||||
self.assertEquals(s1, s2)
|
self.assertEqual(s1, s2)
|
||||||
|
|
||||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||||
def test_snappy_detect_xerial(self):
|
def test_snappy_detect_xerial(self):
|
||||||
@@ -53,7 +53,7 @@ class TestCodec(unittest.TestCase):
|
|||||||
+ struct.pack('!i', block_len) + random_snappy \
|
+ struct.pack('!i', block_len) + random_snappy \
|
||||||
+ struct.pack('!i', block_len2) + random_snappy2 \
|
+ struct.pack('!i', block_len2) + random_snappy2 \
|
||||||
|
|
||||||
self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
|
self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
|
||||||
|
|
||||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||||
def test_snappy_encode_xerial(self):
|
def test_snappy_encode_xerial(self):
|
||||||
@@ -68,5 +68,5 @@ class TestCodec(unittest.TestCase):
|
|||||||
to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
|
to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
|
||||||
|
|
||||||
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
|
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
|
||||||
self.assertEquals(compressed, to_ensure)
|
self.assertEqual(compressed, to_ensure)
|
||||||
|
|
||||||
|
@@ -120,7 +120,7 @@ class ConnTest(unittest.TestCase):
|
|||||||
|
|
||||||
def test_recv(self):
|
def test_recv(self):
|
||||||
|
|
||||||
self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload'])
|
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
|
||||||
|
|
||||||
def test_recv__reconnects_on_dirty_conn(self):
|
def test_recv__reconnects_on_dirty_conn(self):
|
||||||
|
|
||||||
@@ -151,8 +151,8 @@ class ConnTest(unittest.TestCase):
|
|||||||
def test_recv__doesnt_consume_extra_data_in_stream(self):
|
def test_recv__doesnt_consume_extra_data_in_stream(self):
|
||||||
|
|
||||||
# Here just test that each call to recv will return a single payload
|
# Here just test that each call to recv will return a single payload
|
||||||
self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload'])
|
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
|
||||||
self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload2'])
|
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
|
||||||
|
|
||||||
def test_close__object_is_reusable(self):
|
def test_close__object_is_reusable(self):
|
||||||
|
|
||||||
|
@@ -39,16 +39,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
|
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
|
||||||
produce = ProduceRequest(self.topic, partition, messages = messages)
|
produce = ProduceRequest(self.topic, partition, messages = messages)
|
||||||
resp, = self.client.send_produce_request([produce])
|
resp, = self.client.send_produce_request([produce])
|
||||||
self.assertEquals(resp.error, 0)
|
self.assertEqual(resp.error, 0)
|
||||||
|
|
||||||
return [ x.value for x in messages ]
|
return [ x.value for x in messages ]
|
||||||
|
|
||||||
def assert_message_count(self, messages, num_messages):
|
def assert_message_count(self, messages, num_messages):
|
||||||
# Make sure we got them all
|
# Make sure we got them all
|
||||||
self.assertEquals(len(messages), num_messages)
|
self.assertEqual(len(messages), num_messages)
|
||||||
|
|
||||||
# Make sure there are no duplicates
|
# Make sure there are no duplicates
|
||||||
self.assertEquals(len(set(messages)), num_messages)
|
self.assertEqual(len(set(messages)), num_messages)
|
||||||
|
|
||||||
def consumer(self, **kwargs):
|
def consumer(self, **kwargs):
|
||||||
if os.environ['KAFKA_VERSION'] == "0.8.0":
|
if os.environ['KAFKA_VERSION'] == "0.8.0":
|
||||||
@@ -140,9 +140,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
self.send_messages(0, range(0, 10))
|
self.send_messages(0, range(0, 10))
|
||||||
self.send_messages(1, range(10, 20))
|
self.send_messages(1, range(10, 20))
|
||||||
|
|
||||||
self.assertEquals(consumer.pending(), 20)
|
consumer = self.consumer()
|
||||||
self.assertEquals(consumer.pending(partitions=[0]), 10)
|
|
||||||
self.assertEquals(consumer.pending(partitions=[1]), 10)
|
self.assertEqual(consumer.pending(), 20)
|
||||||
|
self.assertEqual(consumer.pending(partitions=[0]), 10)
|
||||||
|
self.assertEqual(consumer.pending(partitions=[1]), 10)
|
||||||
|
|
||||||
# move to last message, so one partition should have 1 pending
|
# move to last message, so one partition should have 1 pending
|
||||||
# message and other 0
|
# message and other 0
|
||||||
@@ -201,9 +203,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
|
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
|
||||||
|
|
||||||
self.assertEquals(consumer.pending(), 20)
|
self.assertEqual(consumer.pending(), 20)
|
||||||
self.assertEquals(consumer.pending(partitions=[0]), 10)
|
self.assertEqual(consumer.pending(partitions=[0]), 10)
|
||||||
self.assertEquals(consumer.pending(partitions=[1]), 10)
|
self.assertEqual(consumer.pending(partitions=[1]), 10)
|
||||||
|
|
||||||
consumer.stop()
|
consumer.stop()
|
||||||
|
|
||||||
@@ -251,7 +253,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
# Consume giant message successfully
|
# Consume giant message successfully
|
||||||
message = big_consumer.get_message(block=False, timeout=10)
|
message = big_consumer.get_message(block=False, timeout=10)
|
||||||
self.assertIsNotNone(message)
|
self.assertIsNotNone(message)
|
||||||
self.assertEquals(message.message.value, huge_message)
|
self.assertEqual(message.message.value, huge_message)
|
||||||
|
|
||||||
big_consumer.stop()
|
big_consumer.stop()
|
||||||
|
|
||||||
@@ -299,7 +301,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
|
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
|
||||||
messages = [ message for message in consumer ]
|
messages = [ message for message in consumer ]
|
||||||
self.assertEquals(len(messages), 2)
|
self.assertEqual(len(messages), 2)
|
||||||
|
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_kafka_consumer(self):
|
def test_kafka_consumer(self):
|
||||||
@@ -319,8 +321,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
if n >= 200:
|
if n >= 200:
|
||||||
break
|
break
|
||||||
|
|
||||||
self.assertEquals(len(messages[0]), 100)
|
self.assertEqual(len(messages[0]), 100)
|
||||||
self.assertEquals(len(messages[1]), 100)
|
self.assertEqual(len(messages[1]), 100)
|
||||||
|
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_kafka_consumer__blocking(self):
|
def test_kafka_consumer__blocking(self):
|
||||||
|
@@ -121,7 +121,7 @@ class TestFailover(KafkaIntegrationTestCase):
|
|||||||
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
|
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
|
||||||
resp = producer.send_messages(topic, partition, random_string(10))
|
resp = producer.send_messages(topic, partition, random_string(10))
|
||||||
if len(resp) > 0:
|
if len(resp) > 0:
|
||||||
self.assertEquals(resp[0].error, 0)
|
self.assertEqual(resp[0].error, 0)
|
||||||
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
|
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
|
||||||
|
|
||||||
def _kill_leader(self, topic, partition):
|
def _kill_leader(self, topic, partition):
|
||||||
|
@@ -3,27 +3,27 @@ from . import unittest
|
|||||||
class TestPackage(unittest.TestCase):
|
class TestPackage(unittest.TestCase):
|
||||||
def test_top_level_namespace(self):
|
def test_top_level_namespace(self):
|
||||||
import kafka as kafka1
|
import kafka as kafka1
|
||||||
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
|
self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient")
|
||||||
self.assertEquals(kafka1.client.__name__, "kafka.client")
|
self.assertEqual(kafka1.client.__name__, "kafka.client")
|
||||||
self.assertEquals(kafka1.codec.__name__, "kafka.codec")
|
self.assertEqual(kafka1.codec.__name__, "kafka.codec")
|
||||||
|
|
||||||
def test_submodule_namespace(self):
|
def test_submodule_namespace(self):
|
||||||
import kafka.client as client1
|
import kafka.client as client1
|
||||||
self.assertEquals(client1.__name__, "kafka.client")
|
self.assertEqual(client1.__name__, "kafka.client")
|
||||||
self.assertEquals(client1.KafkaClient.__name__, "KafkaClient")
|
self.assertEqual(client1.KafkaClient.__name__, "KafkaClient")
|
||||||
|
|
||||||
from kafka import client as client2
|
from kafka import client as client2
|
||||||
self.assertEquals(client2.__name__, "kafka.client")
|
self.assertEqual(client2.__name__, "kafka.client")
|
||||||
self.assertEquals(client2.KafkaClient.__name__, "KafkaClient")
|
self.assertEqual(client2.KafkaClient.__name__, "KafkaClient")
|
||||||
|
|
||||||
from kafka.client import KafkaClient as KafkaClient1
|
from kafka.client import KafkaClient as KafkaClient1
|
||||||
self.assertEquals(KafkaClient1.__name__, "KafkaClient")
|
self.assertEqual(KafkaClient1.__name__, "KafkaClient")
|
||||||
|
|
||||||
from kafka.codec import gzip_encode as gzip_encode1
|
from kafka.codec import gzip_encode as gzip_encode1
|
||||||
self.assertEquals(gzip_encode1.__name__, "gzip_encode")
|
self.assertEqual(gzip_encode1.__name__, "gzip_encode")
|
||||||
|
|
||||||
from kafka import KafkaClient as KafkaClient2
|
from kafka import KafkaClient as KafkaClient2
|
||||||
self.assertEquals(KafkaClient2.__name__, "KafkaClient")
|
self.assertEqual(KafkaClient2.__name__, "KafkaClient")
|
||||||
|
|
||||||
from kafka.codec import snappy_encode
|
from kafka.codec import snappy_encode
|
||||||
self.assertEquals(snappy_encode.__name__, "snappy_encode")
|
self.assertEqual(snappy_encode.__name__, "snappy_encode")
|
||||||
|
@@ -251,7 +251,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
|
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
|
||||||
resp = producer.send_messages(self.topic, self.msg("one"))
|
resp = producer.send_messages(self.topic, self.msg("one"))
|
||||||
self.assertEquals(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
||||||
producer.stop()
|
producer.stop()
|
||||||
@@ -301,7 +301,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Batch mode is async. No ack
|
# Batch mode is async. No ack
|
||||||
self.assertEquals(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
# It hasn't sent yet
|
# It hasn't sent yet
|
||||||
self.assert_fetch_offset(0, start_offset0, [])
|
self.assert_fetch_offset(0, start_offset0, [])
|
||||||
@@ -314,7 +314,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Batch mode is async. No ack
|
# Batch mode is async. No ack
|
||||||
self.assertEquals(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
self.assert_fetch_offset(0, start_offset0, [
|
self.assert_fetch_offset(0, start_offset0, [
|
||||||
self.msg("one"),
|
self.msg("one"),
|
||||||
@@ -350,7 +350,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Batch mode is async. No ack
|
# Batch mode is async. No ack
|
||||||
self.assertEquals(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
# It hasn't sent yet
|
# It hasn't sent yet
|
||||||
self.assert_fetch_offset(0, start_offset0, [])
|
self.assert_fetch_offset(0, start_offset0, [])
|
||||||
@@ -363,7 +363,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Batch mode is async. No ack
|
# Batch mode is async. No ack
|
||||||
self.assertEquals(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
# Wait the timeout out
|
# Wait the timeout out
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
@@ -389,7 +389,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
producer = SimpleProducer(self.client, async=True)
|
producer = SimpleProducer(self.client, async=True)
|
||||||
resp = producer.send_messages(self.topic, self.msg("one"))
|
resp = producer.send_messages(self.topic, self.msg("one"))
|
||||||
self.assertEquals(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
||||||
|
|
||||||
@@ -402,7 +402,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
|
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
|
||||||
|
|
||||||
resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
|
resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
|
||||||
self.assertEquals(len(resp), 0)
|
self.assertEqual(len(resp), 0)
|
||||||
|
|
||||||
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
||||||
|
|
||||||
@@ -429,9 +429,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
|
resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
|
||||||
|
|
||||||
self.assertEquals(resp.error, 0)
|
self.assertEqual(resp.error, 0)
|
||||||
self.assertEquals(resp.partition, partition)
|
self.assertEqual(resp.partition, partition)
|
||||||
messages = [ x.message.value for x in resp.messages ]
|
messages = [ x.message.value for x in resp.messages ]
|
||||||
|
|
||||||
self.assertEqual(messages, expected_messages)
|
self.assertEqual(messages, expected_messages)
|
||||||
self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages))
|
self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages))
|
||||||
|
Reference in New Issue
Block a user