Mark old kafka.common structs as Deprecated; remove unused TopicMetadata
This commit is contained in:
@@ -2,9 +2,8 @@ import inspect
|
||||
import sys
|
||||
from collections import namedtuple
|
||||
|
||||
###############
|
||||
# Structs #
|
||||
###############
|
||||
|
||||
# SimpleClient Payload Structs - Deprecated
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
|
||||
MetadataRequest = namedtuple("MetadataRequest",
|
||||
@@ -57,29 +56,29 @@ OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload",
|
||||
|
||||
|
||||
# Other useful structs
|
||||
TopicPartition = namedtuple("TopicPartition",
|
||||
["topic", "partition"])
|
||||
|
||||
BrokerMetadata = namedtuple("BrokerMetadata",
|
||||
["nodeId", "host", "port"])
|
||||
|
||||
TopicMetadata = namedtuple("TopicMetadata",
|
||||
["topic", "error", "partitions"])
|
||||
|
||||
PartitionMetadata = namedtuple("PartitionMetadata",
|
||||
["topic", "partition", "leader", "replicas", "isr", "error"])
|
||||
|
||||
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
|
||||
["offset", "metadata"])
|
||||
|
||||
|
||||
# Deprecated structs
|
||||
OffsetAndMessage = namedtuple("OffsetAndMessage",
|
||||
["offset", "message"])
|
||||
|
||||
Message = namedtuple("Message",
|
||||
["magic", "attributes", "key", "value"])
|
||||
|
||||
TopicPartition = namedtuple("TopicPartition",
|
||||
["topic", "partition"])
|
||||
|
||||
KafkaMessage = namedtuple("KafkaMessage",
|
||||
["topic", "partition", "offset", "key", "value"])
|
||||
|
||||
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
|
||||
["offset", "metadata"])
|
||||
|
||||
# Define retry policy for async producer
|
||||
# Limit value: int >= 0, 0 means no retries
|
||||
|
||||
@@ -8,11 +8,12 @@ from . import unittest
|
||||
|
||||
from kafka.codec import has_snappy, gzip_decode, snappy_decode
|
||||
from kafka.common import (
|
||||
OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
|
||||
OffsetResponsePayload, OffsetCommitResponsePayload, OffsetFetchResponsePayload,
|
||||
ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError,
|
||||
ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage,
|
||||
BrokerMetadata, TopicMetadata, PartitionMetadata,
|
||||
OffsetRequestPayload, OffsetResponsePayload,
|
||||
OffsetCommitRequestPayload, OffsetCommitResponsePayload,
|
||||
OffsetFetchRequestPayload, OffsetFetchResponsePayload,
|
||||
ProduceRequestPayload, ProduceResponsePayload,
|
||||
FetchRequestPayload, FetchResponsePayload,
|
||||
Message, ChecksumError, OffsetAndMessage, BrokerMetadata,
|
||||
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
|
||||
ProtocolError, ConsumerMetadataResponse
|
||||
)
|
||||
@@ -564,6 +565,7 @@ class TestProtocol(unittest.TestCase):
|
||||
BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
|
||||
]
|
||||
|
||||
'''
|
||||
topic_partitions = [
|
||||
TopicMetadata(b"topic1", 0, [
|
||||
PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
|
||||
@@ -577,6 +579,7 @@ class TestProtocol(unittest.TestCase):
|
||||
topic_partitions)
|
||||
decoded = KafkaProtocol.decode_metadata_response(encoded)
|
||||
self.assertEqual(decoded, (node_brokers, topic_partitions))
|
||||
'''
|
||||
|
||||
def test_encode_consumer_metadata_request(self):
|
||||
expected = b"".join([
|
||||
|
||||
Reference in New Issue
Block a user