Add new FetchRequest v4/v5, MetadataRequest v3/v4

This commit is contained in:
Dana Powers
2017-06-19 10:11:53 -07:00
parent bbbac3dc36
commit 3a67f64168
2 changed files with 150 additions and 7 deletions

View File

@@ -2,7 +2,7 @@ from __future__ import absolute_import
from .api import Request, Response
from .message import MessageSet
from .types import Array, Int16, Int32, Int64, Schema, String
from .types import Array, Int8, Int16, Int32, Int64, Schema, String
class FetchResponse_v0(Response):
@@ -46,6 +46,45 @@ class FetchResponse_v3(Response):
SCHEMA = FetchResponse_v2.SCHEMA
class FetchResponse_v4(Response):
API_KEY = 1
API_VERSION = 4
SCHEMA = Schema(
('throttle_time_ms', Int32),
('topics', Array(
('topics', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('last_stable_offset', Int64),
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('message_set', MessageSet)))))
)
class FetchResponse_v5(Response):
API_KEY = 1
API_VERSION = 5
SCHEMA = Schema(
('throttle_time_ms', Int32),
('topics', Array(
('topics', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('last_stable_offset', Int64),
('log_start_offset', Int64),
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('message_set', MessageSet)))))
)
class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
@@ -95,7 +134,52 @@ class FetchRequest_v3(Request):
)
FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
FetchRequest_v3]
FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
FetchResponse_v3]
class FetchRequest_v4(Request):
"""Adds isolation_level field"""
API_KEY = 1
API_VERSION = 4
RESPONSE_TYPE = FetchResponse_v4
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('offset', Int64),
('max_bytes', Int32)))))
)
class FetchRequest_v5(Request):
"""This may only be used in broker-broker api calls"""
API_KEY = 1
API_VERSION = 5
RESPONSE_TYPE = FetchResponse_v5
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('fetch_offset', Int64),
('log_start_offset', Int64),
('max_bytes', Int32)))))
)
FetchRequest = [
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
]
FetchResponse = [
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
]

View File

@@ -71,6 +71,37 @@ class MetadataResponse_v2(Response):
)
class MetadataResponse_v3(Response):
API_KEY = 3
API_VERSION = 3
SCHEMA = Schema(
('throttle_time_ms', Int32),
('brokers', Array(
('node_id', Int32),
('host', String('utf-8')),
('port', Int32),
('rack', String('utf-8')))),
('cluster_id', String('utf-8')), # <-- Added cluster_id field in v2
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32))))))
)
class MetadataResponse_v4(Response):
API_KEY = 3
API_VERSION = 4
SCHEMA = MetadataResponse_v3.SCHEMA
class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
@@ -95,8 +126,36 @@ class MetadataRequest_v2(Request):
API_VERSION = 2
RESPONSE_TYPE = MetadataResponse_v2
SCHEMA = MetadataRequest_v1.SCHEMA
ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2]
class MetadataRequest_v3(Request):
API_KEY = 3
API_VERSION = 3
RESPONSE_TYPE = MetadataResponse_v3
SCHEMA = MetadataRequest_v1.SCHEMA
ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
class MetadataRequest_v4(Request):
API_KEY = 3
API_VERSION = 4
RESPONSE_TYPE = MetadataResponse_v4
SCHEMA = Schema(
('topics', Array(String('utf-8'))),
('allow_auto_topic_creation', Boolean)
)
ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
MetadataRequest = [
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
MetadataRequest_v3, MetadataRequest_v4
]
MetadataResponse = [
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2]
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
MetadataResponse_v3, MetadataResponse_v4
]