Add simple BrokerConnection class; add request.RESPONSE_TYPE class vars

This commit is contained in:
Dana Powers
2015-11-29 10:00:50 +08:00
committed by Zack Dever
parent a85e09df89
commit 058567912e
6 changed files with 128 additions and 85 deletions

View File

@@ -8,6 +8,8 @@ from threading import local
import six
from kafka.common import ConnectionError
from kafka.protocol.api import RequestHeader
from kafka.protocol.types import Int32
log = logging.getLogger(__name__)
@@ -16,6 +18,40 @@ DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
class BrokerConnection(local):
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(BrokerConnection, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self._sock = socket.create_connection((host, port), timeout)
self.fd = self._sock.makefile(mode='+')
self.correlation_id = 0
def close(self):
self.fd.close()
self._sock.close()
def send(self, request):
self.correlation_id += 1
header = RequestHeader(request, correlation_id=self.correlation_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
self.fd.write(size)
self.fd.write(message)
self.fd.flush()
size = Int32.decode(self.fd)
correlation_id = Int32.decode(self.fd)
return request.RESPONSE_TYPE.decode(self.fd)
def __getnewargs__(self):
return (self.host, self.port, self.timeout)
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally

View File

@@ -2,9 +2,20 @@ from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
class OffsetCommitResponse(Struct):
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16)))))
)
class OffsetCommitRequest_v2(Struct):
API_KEY = 8
API_VERSION = 2 # added retention_time, dropped timestamp
RESPONSE_TYPE = OffsetCommitResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
@@ -22,6 +33,7 @@ class OffsetCommitRequest_v2(Struct):
class OffsetCommitRequest_v1(Struct):
API_KEY = 8
API_VERSION = 1 # Kafka-backed storage
RESPONSE_TYPE = OffsetCommitResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
@@ -39,6 +51,7 @@ class OffsetCommitRequest_v1(Struct):
class OffsetCommitRequest_v0(Struct):
API_KEY = 8
API_VERSION = 0 # Zookeeper-backed storage
RESPONSE_TYPE = OffsetCommitResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
@@ -50,38 +63,6 @@ class OffsetCommitRequest_v0(Struct):
)
class OffsetCommitResponse(Struct):
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16)))))
)
class OffsetFetchRequest_v1(Struct):
API_KEY = 9
API_VERSION = 1 # kafka-backed storage
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(Int32))))
)
class OffsetFetchRequest_v0(Struct):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(Int32))))
)
class OffsetFetchResponse(Struct):
SCHEMA = Schema(
('topics', Array(
@@ -94,11 +75,27 @@ class OffsetFetchResponse(Struct):
)
class GroupCoordinatorRequest(Struct):
API_KEY = 10
API_VERSION = 0
class OffsetFetchRequest_v1(Struct):
API_KEY = 9
API_VERSION = 1 # kafka-backed storage
RESPONSE_TYPE = OffsetFetchResponse
SCHEMA = Schema(
('consumer_group', String('utf-8'))
('consumer_group', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(Int32))))
)
class OffsetFetchRequest_v0(Struct):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
RESPONSE_TYPE = OffsetFetchResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(Int32))))
)
@@ -109,3 +106,12 @@ class GroupCoordinatorResponse(Struct):
('host', String('utf-8')),
('port', Int32)
)
class GroupCoordinatorRequest(Struct):
API_KEY = 10
API_VERSION = 0
RESPONSE_TYPE = GroupCoordinatorResponse
SCHEMA = Schema(
('consumer_group', String('utf-8'))
)

View File

@@ -3,21 +3,6 @@ from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
class FetchRequest(Struct):
API_KEY = 1
API_VERSION = 0
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('offset', Int64),
('max_bytes', Int32)))))
)
class FetchResponse(Struct):
SCHEMA = Schema(
('topics', Array(
@@ -28,3 +13,20 @@ class FetchResponse(Struct):
('highwater_offset', Int64),
('message_set', MessageSet)))))
)
class FetchRequest(Struct):
API_KEY = 1
API_VERSION = 0
RESPONSE_TYPE = FetchResponse
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('offset', Int64),
('max_bytes', Int32)))))
)

View File

@@ -2,14 +2,6 @@ from .struct import Struct
from .types import Array, Int16, Int32, Schema, String
class MetadataRequest(Struct):
API_KEY = 3
API_VERSION = 0
SCHEMA = Schema(
('topics', Array(String('utf-8')))
)
class MetadataResponse(Struct):
SCHEMA = Schema(
('brokers', Array(
@@ -26,3 +18,12 @@ class MetadataResponse(Struct):
('replicas', Array(Int32)),
('isr', Array(Int32))))))
)
class MetadataRequest(Struct):
API_KEY = 3
API_VERSION = 0
RESPONSE_TYPE = MetadataResponse
SCHEMA = Schema(
('topics', Array(String('utf-8')))
)

View File

@@ -2,9 +2,21 @@ from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
class OffsetResponse(Struct):
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offsets', Array(Int64))))))
)
class OffsetRequest(Struct):
API_KEY = 2
API_VERSION = 0
RESPONSE_TYPE = OffsetResponse
SCHEMA = Schema(
('replica_id', Int32),
('topics', Array(
@@ -17,16 +29,3 @@ class OffsetRequest(Struct):
DEFAULTS = {
'replica_id': -1
}
class OffsetResponse(Struct):
API_KEY = 2
API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offsets', Array(Int64))))))
)

View File

@@ -3,9 +3,21 @@ from .struct import Struct
from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema
class ProduceResponse(Struct):
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offset', Int64)))))
)
class ProduceRequest(Struct):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = ProduceResponse
SCHEMA = Schema(
('required_acks', Int16),
('timeout', Int32),
@@ -15,16 +27,3 @@ class ProduceRequest(Struct):
('partition', Int32),
('messages', MessageSet)))))
)
class ProduceResponse(Struct):
API_KEY = 0
API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offset', Int64)))))
)