Fix Python 2.6 support
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
import random
|
||||
import struct
|
||||
import unittest
|
||||
import unittest2
|
||||
|
||||
from mock import MagicMock, patch
|
||||
|
||||
@@ -15,7 +15,7 @@ from kafka.protocol import (
|
||||
create_message, KafkaProtocol
|
||||
)
|
||||
|
||||
class TestKafkaClient(unittest.TestCase):
|
||||
class TestKafkaClient(unittest2.TestCase):
|
||||
def test_init_with_list(self):
|
||||
with patch.object(KafkaClient, 'load_metadata_for_topics'):
|
||||
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
|
||||
|
@@ -2,7 +2,6 @@ import os
|
||||
import random
|
||||
import socket
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import kafka
|
||||
from kafka.common import *
|
||||
|
@@ -1,18 +1,6 @@
|
||||
import os
|
||||
import random
|
||||
import struct
|
||||
import unittest
|
||||
import unittest2
|
||||
|
||||
from mock import MagicMock, patch
|
||||
|
||||
from kafka import KafkaClient
|
||||
from kafka.common import (
|
||||
ProduceRequest, FetchRequest, Message, ChecksumError,
|
||||
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
|
||||
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
|
||||
TopicAndPartition, KafkaUnavailableError,
|
||||
LeaderUnavailableError, PartitionUnavailableError
|
||||
)
|
||||
from kafka.codec import (
|
||||
has_snappy, gzip_encode, gzip_decode,
|
||||
snappy_encode, snappy_decode
|
||||
@@ -22,21 +10,21 @@ from kafka.protocol import (
|
||||
)
|
||||
from testutil import *
|
||||
|
||||
class TestCodec(unittest.TestCase):
|
||||
class TestCodec(unittest2.TestCase):
|
||||
def test_gzip(self):
|
||||
for i in xrange(1000):
|
||||
s1 = random_string(100)
|
||||
s2 = gzip_decode(gzip_encode(s1))
|
||||
self.assertEquals(s1, s2)
|
||||
|
||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||
@unittest2.skipUnless(has_snappy(), "Snappy not available")
|
||||
def test_snappy(self):
|
||||
for i in xrange(1000):
|
||||
s1 = random_string(100)
|
||||
s2 = snappy_decode(snappy_encode(s1))
|
||||
self.assertEquals(s1, s2)
|
||||
|
||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||
@unittest2.skipUnless(has_snappy(), "Snappy not available")
|
||||
def test_snappy_detect_xerial(self):
|
||||
import kafka as kafka1
|
||||
_detect_xerial_stream = kafka1.codec._detect_xerial_stream
|
||||
@@ -53,7 +41,7 @@ class TestCodec(unittest.TestCase):
|
||||
self.assertFalse(_detect_xerial_stream(random_snappy))
|
||||
self.assertFalse(_detect_xerial_stream(short_data))
|
||||
|
||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||
@unittest2.skipUnless(has_snappy(), "Snappy not available")
|
||||
def test_snappy_decode_xerial(self):
|
||||
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
|
||||
random_snappy = snappy_encode('SNAPPY' * 50)
|
||||
@@ -67,7 +55,7 @@ class TestCodec(unittest.TestCase):
|
||||
|
||||
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
|
||||
|
||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||
@unittest2.skipUnless(has_snappy(), "Snappy not available")
|
||||
def test_snappy_encode_xerial(self):
|
||||
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
|
||||
'\x00\x00\x00\x18' + \
|
||||
|
@@ -1,10 +1,10 @@
|
||||
import os
|
||||
import random
|
||||
import struct
|
||||
import unittest
|
||||
import unittest2
|
||||
import kafka.conn
|
||||
|
||||
class ConnTest(unittest.TestCase):
|
||||
class ConnTest(unittest2.TestCase):
|
||||
def test_collect_hosts__happy_path(self):
|
||||
hosts = "localhost:1234,localhost"
|
||||
results = kafka.conn.collect_hosts(hosts)
|
||||
@@ -36,34 +36,34 @@ class ConnTest(unittest.TestCase):
|
||||
('localhost', 9092),
|
||||
]))
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_send(self):
|
||||
pass
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_send__reconnects_on_dirty_conn(self):
|
||||
pass
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_send__failure_sets_dirty_connection(self):
|
||||
pass
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_recv(self):
|
||||
pass
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_recv__reconnects_on_dirty_conn(self):
|
||||
pass
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_recv__failure_sets_dirty_connection(self):
|
||||
pass
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_recv__doesnt_consume_extra_data_in_stream(self):
|
||||
pass
|
||||
|
||||
@unittest.skip("Not Implemented")
|
||||
@unittest2.skip("Not Implemented")
|
||||
def test_close__object_is_reusable(self):
|
||||
pass
|
||||
|
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import unittest
|
||||
from datetime import datetime
|
||||
|
||||
from kafka import * # noqa
|
||||
|
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from kafka import * # noqa
|
||||
from kafka.common import * # noqa
|
||||
|
@@ -1,9 +1,6 @@
|
||||
import os
|
||||
import random
|
||||
import struct
|
||||
import unittest
|
||||
import unittest2
|
||||
|
||||
class TestPackage(unittest.TestCase):
|
||||
class TestPackage(unittest2.TestCase):
|
||||
def test_top_level_namespace(self):
|
||||
import kafka as kafka1
|
||||
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
|
||||
|
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
from kafka import * # noqa
|
||||
|
@@ -1,5 +1,5 @@
|
||||
import struct
|
||||
import unittest
|
||||
import unittest2
|
||||
|
||||
from kafka import KafkaClient
|
||||
from kafka.common import (
|
||||
@@ -19,7 +19,7 @@ from kafka.protocol import (
|
||||
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
|
||||
)
|
||||
|
||||
class TestProtocol(unittest.TestCase):
|
||||
class TestProtocol(unittest2.TestCase):
|
||||
def test_create_message(self):
|
||||
payload = "test"
|
||||
key = "key"
|
||||
@@ -58,7 +58,7 @@ class TestProtocol(unittest.TestCase):
|
||||
|
||||
self.assertEqual(decoded, expect)
|
||||
|
||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||
@unittest2.skipUnless(has_snappy(), "Snappy not available")
|
||||
def test_create_snappy(self):
|
||||
payloads = ["v1", "v2"]
|
||||
msg = create_snappy_message(payloads)
|
||||
@@ -216,7 +216,7 @@ class TestProtocol(unittest.TestCase):
|
||||
self.assertEqual(returned_offset2, 0)
|
||||
self.assertEqual(decoded_message2, create_message("v2"))
|
||||
|
||||
@unittest.skipUnless(has_snappy(), "Snappy not available")
|
||||
@unittest2.skipUnless(has_snappy(), "Snappy not available")
|
||||
def test_decode_message_snappy(self):
|
||||
snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
|
||||
'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
|
||||
@@ -567,10 +567,10 @@ class TestProtocol(unittest.TestCase):
|
||||
])
|
||||
|
||||
results = KafkaProtocol.decode_offset_response(encoded)
|
||||
self.assertEqual(set(results), {
|
||||
self.assertEqual(set(results), set([
|
||||
OffsetResponse(topic = 'topic1', partition = 2, error = 0, offsets=(4,)),
|
||||
OffsetResponse(topic = 'topic1', partition = 4, error = 0, offsets=(8,)),
|
||||
})
|
||||
]))
|
||||
|
||||
def test_encode_offset_commit_request(self):
|
||||
header = "".join([
|
||||
@@ -629,10 +629,10 @@ class TestProtocol(unittest.TestCase):
|
||||
])
|
||||
|
||||
results = KafkaProtocol.decode_offset_commit_response(encoded)
|
||||
self.assertEqual(set(results), {
|
||||
self.assertEqual(set(results), set([
|
||||
OffsetCommitResponse(topic = 'topic1', partition = 2, error = 0),
|
||||
OffsetCommitResponse(topic = 'topic1', partition = 4, error = 0),
|
||||
})
|
||||
]))
|
||||
|
||||
def test_encode_offset_fetch_request(self):
|
||||
header = "".join([
|
||||
@@ -688,7 +688,7 @@ class TestProtocol(unittest.TestCase):
|
||||
])
|
||||
|
||||
results = KafkaProtocol.decode_offset_fetch_response(encoded)
|
||||
self.assertEqual(set(results), {
|
||||
self.assertEqual(set(results), set([
|
||||
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
|
||||
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
|
||||
})
|
||||
]))
|
||||
|
@@ -1,15 +1,18 @@
|
||||
import os
|
||||
import random
|
||||
import struct
|
||||
import unittest
|
||||
import unittest2
|
||||
import kafka.util
|
||||
|
||||
class UtilTest(unittest.TestCase):
|
||||
class UtilTest(unittest2.TestCase):
|
||||
@unittest2.skip("Unwritten")
|
||||
def test_relative_unpack(self):
|
||||
pass
|
||||
|
||||
@unittest2.skip("Unwritten")
|
||||
def test_write_int_string(self):
|
||||
pass
|
||||
|
||||
@unittest2.skip("Unwritten")
|
||||
def test_read_int_string(self):
|
||||
pass
|
||||
|
@@ -5,7 +5,7 @@ import random
|
||||
import socket
|
||||
import string
|
||||
import time
|
||||
import unittest
|
||||
import unittest2
|
||||
import uuid
|
||||
|
||||
from kafka.common import OffsetRequest
|
||||
@@ -56,7 +56,7 @@ def get_open_port():
|
||||
sock.close()
|
||||
return port
|
||||
|
||||
class KafkaIntegrationTestCase(unittest.TestCase):
|
||||
class KafkaIntegrationTestCase(unittest2.TestCase):
|
||||
create_client = True
|
||||
topic = None
|
||||
|
||||
|
Reference in New Issue
Block a user