From 65ba8822b10e6f8a3ba4e9a6b0a1e6f9b785c18e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 14 Mar 2017 13:34:37 -0700 Subject: [PATCH] Derive all api classes from Request / Response base classes (#1030) --- kafka/client.py | 8 ++----- kafka/client_async.py | 7 +----- kafka/conn.py | 8 +++---- kafka/protocol/admin.py | 30 +++++++++++------------ kafka/protocol/api.py | 49 ++++++++++++++++++++++++++++++++++++++ kafka/protocol/commit.py | 30 +++++++++++------------ kafka/protocol/fetch.py | 18 +++++++------- kafka/protocol/group.py | 21 ++++++++-------- kafka/protocol/metadata.py | 14 +++++------ kafka/protocol/offset.py | 10 ++++---- kafka/protocol/produce.py | 29 ++++++++++++++++------ test/test_client_async.py | 5 ++-- test/test_conn.py | 5 ++-- 13 files changed, 146 insertions(+), 88 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 1f7c23b..c233ea6 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -257,18 +257,14 @@ class SimpleClient(object): continue request = encoder_fn(payloads=broker_payloads) - # decoder_fn=None signal that the server is expected to not - # send a response. This probably only applies to - # ProduceRequest w/ acks = 0 - expect_response = (decoder_fn is not None) - future = conn.send(request, expect_response=expect_response) + future = conn.send(request) if future.failed(): refresh_metadata = True failed_payloads(broker_payloads) continue - if not expect_response: + if not request.expect_response(): for payload in broker_payloads: topic_partition = (str(payload.topic), payload.partition) responses[topic_partition] = None diff --git a/kafka/client_async.py b/kafka/client_async.py index c0cdc43..2d711e4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -464,12 +464,7 @@ class KafkaClient(object): if not self._maybe_connect(node_id): return Future().failure(Errors.NodeNotReadyError(node_id)) - # Every request gets a response, except one special case: - expect_response = True - if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0: - expect_response = False - - return self._conns[node_id].send(request, expect_response=expect_response) + return self._conns[node_id].send(request) def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): """Try to read and write to sockets. diff --git a/kafka/conn.py b/kafka/conn.py index 29f6911..d5b7c50 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -525,7 +525,7 @@ class BrokerConnection(object): ifr.future.failure(error) self.config['state_change_callback'](self) - def send(self, request, expect_response=True): + def send(self, request): """send request, return Future() Can block on network if request is larger than send_buffer_bytes @@ -537,9 +537,9 @@ class BrokerConnection(object): return future.failure(Errors.ConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) - return self._send(request, expect_response=expect_response) + return self._send(request) - def _send(self, request, expect_response=True): + def _send(self, request): assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() correlation_id = self._next_correlation_id() @@ -569,7 +569,7 @@ class BrokerConnection(object): return future.failure(error) log.debug('%s Request %d: %s', self, correlation_id, request) - if expect_response: + if request.expect_response(): ifr = InFlightRequest(request=request, correlation_id=correlation_id, response_type=request.RESPONSE_TYPE, diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 89ea739..c5142b3 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -1,10 +1,10 @@ from __future__ import absolute_import -from .struct import Struct +from .api import Request, Response from .types import Array, Boolean, Bytes, Int16, Int32, Schema, String -class ApiVersionResponse_v0(Struct): +class ApiVersionResponse_v0(Response): API_KEY = 18 API_VERSION = 0 SCHEMA = Schema( @@ -16,7 +16,7 @@ class ApiVersionResponse_v0(Struct): ) -class ApiVersionRequest_v0(Struct): +class ApiVersionRequest_v0(Request): API_KEY = 18 API_VERSION = 0 RESPONSE_TYPE = ApiVersionResponse_v0 @@ -27,7 +27,7 @@ ApiVersionRequest = [ApiVersionRequest_v0] ApiVersionResponse = [ApiVersionResponse_v0] -class CreateTopicsResponse_v0(Struct): +class CreateTopicsResponse_v0(Response): API_KEY = 19 API_VERSION = 0 SCHEMA = Schema( @@ -37,7 +37,7 @@ class CreateTopicsResponse_v0(Struct): ) -class CreateTopicsResponse_v1(Struct): +class CreateTopicsResponse_v1(Response): API_KEY = 19 API_VERSION = 1 SCHEMA = Schema( @@ -48,7 +48,7 @@ class CreateTopicsResponse_v1(Struct): ) -class CreateTopicsRequest_v0(Struct): +class CreateTopicsRequest_v0(Request): API_KEY = 19 API_VERSION = 0 RESPONSE_TYPE = CreateTopicsResponse_v0 @@ -67,7 +67,7 @@ class CreateTopicsRequest_v0(Struct): ) -class CreateTopicsRequest_v1(Struct): +class CreateTopicsRequest_v1(Request): API_KEY = 19 API_VERSION = 1 RESPONSE_TYPE = CreateTopicsResponse_v1 @@ -91,7 +91,7 @@ CreateTopicsRequest = [CreateTopicsRequest_v0, CreateTopicsRequest_v1] CreateTopicsResponse = [CreateTopicsResponse_v0, CreateTopicsRequest_v1] -class DeleteTopicsResponse_v0(Struct): +class DeleteTopicsResponse_v0(Response): API_KEY = 20 API_VERSION = 0 SCHEMA = Schema( @@ -101,7 +101,7 @@ class DeleteTopicsResponse_v0(Struct): ) -class DeleteTopicsRequest_v0(Struct): +class DeleteTopicsRequest_v0(Request): API_KEY = 20 API_VERSION = 0 RESPONSE_TYPE = DeleteTopicsResponse_v0 @@ -115,7 +115,7 @@ DeleteTopicsRequest = [DeleteTopicsRequest_v0] DeleteTopicsResponse = [DeleteTopicsResponse_v0] -class ListGroupsResponse_v0(Struct): +class ListGroupsResponse_v0(Response): API_KEY = 16 API_VERSION = 0 SCHEMA = Schema( @@ -126,7 +126,7 @@ class ListGroupsResponse_v0(Struct): ) -class ListGroupsRequest_v0(Struct): +class ListGroupsRequest_v0(Request): API_KEY = 16 API_VERSION = 0 RESPONSE_TYPE = ListGroupsResponse_v0 @@ -137,7 +137,7 @@ ListGroupsRequest = [ListGroupsRequest_v0] ListGroupsResponse = [ListGroupsResponse_v0] -class DescribeGroupsResponse_v0(Struct): +class DescribeGroupsResponse_v0(Response): API_KEY = 15 API_VERSION = 0 SCHEMA = Schema( @@ -156,7 +156,7 @@ class DescribeGroupsResponse_v0(Struct): ) -class DescribeGroupsRequest_v0(Struct): +class DescribeGroupsRequest_v0(Request): API_KEY = 15 API_VERSION = 0 RESPONSE_TYPE = DescribeGroupsResponse_v0 @@ -169,7 +169,7 @@ DescribeGroupsRequest = [DescribeGroupsRequest_v0] DescribeGroupsResponse = [DescribeGroupsResponse_v0] -class SaslHandShakeResponse_v0(Struct): +class SaslHandShakeResponse_v0(Response): API_KEY = 17 API_VERSION = 0 SCHEMA = Schema( @@ -178,7 +178,7 @@ class SaslHandShakeResponse_v0(Struct): ) -class SaslHandShakeRequest_v0(Struct): +class SaslHandShakeRequest_v0(Request): API_KEY = 17 API_VERSION = 0 RESPONSE_TYPE = SaslHandShakeResponse_v0 diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index 7779aac..ec24a39 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +import abc + from .struct import Struct from .types import Int16, Int32, String, Schema @@ -16,3 +18,50 @@ class RequestHeader(Struct): super(RequestHeader, self).__init__( request.API_KEY, request.API_VERSION, correlation_id, client_id ) + + +class Request(Struct): + __metaclass__ = abc.ABCMeta + + @abc.abstractproperty + def API_KEY(self): + """Integer identifier for api request""" + pass + + @abc.abstractproperty + def API_VERSION(self): + """Integer of api request version""" + pass + + @abc.abstractproperty + def SCHEMA(self): + """An instance of Schema() representing the request structure""" + pass + + @abc.abstractproperty + def RESPONSE_TYPE(self): + """The Response class associated with the api request""" + pass + + def expect_response(self): + """Override this method if an api request does not always generate a response""" + return True + + +class Response(Struct): + __metaclass__ = abc.ABCMeta + + @abc.abstractproperty + def API_KEY(self): + """Integer identifier for api request/response""" + pass + + @abc.abstractproperty + def API_VERSION(self): + """Integer of api request/response version""" + pass + + @abc.abstractproperty + def SCHEMA(self): + """An instance of Schema() representing the response structure""" + pass diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 5645372..bcffe67 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -1,10 +1,10 @@ from __future__ import absolute_import -from .struct import Struct +from .api import Request, Response from .types import Array, Int16, Int32, Int64, Schema, String -class OffsetCommitResponse_v0(Struct): +class OffsetCommitResponse_v0(Response): API_KEY = 8 API_VERSION = 0 SCHEMA = Schema( @@ -16,19 +16,19 @@ class OffsetCommitResponse_v0(Struct): ) -class OffsetCommitResponse_v1(Struct): +class OffsetCommitResponse_v1(Response): API_KEY = 8 API_VERSION = 1 SCHEMA = OffsetCommitResponse_v0.SCHEMA -class OffsetCommitResponse_v2(Struct): +class OffsetCommitResponse_v2(Response): API_KEY = 8 API_VERSION = 2 SCHEMA = OffsetCommitResponse_v1.SCHEMA -class OffsetCommitRequest_v0(Struct): +class OffsetCommitRequest_v0(Request): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage RESPONSE_TYPE = OffsetCommitResponse_v0 @@ -43,7 +43,7 @@ class OffsetCommitRequest_v0(Struct): ) -class OffsetCommitRequest_v1(Struct): +class OffsetCommitRequest_v1(Request): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage RESPONSE_TYPE = OffsetCommitResponse_v1 @@ -61,7 +61,7 @@ class OffsetCommitRequest_v1(Struct): ) -class OffsetCommitRequest_v2(Struct): +class OffsetCommitRequest_v2(Request): API_KEY = 8 API_VERSION = 2 # added retention_time, dropped timestamp RESPONSE_TYPE = OffsetCommitResponse_v2 @@ -87,7 +87,7 @@ OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1, OffsetCommitResponse_v2] -class OffsetFetchResponse_v0(Struct): +class OffsetFetchResponse_v0(Response): API_KEY = 9 API_VERSION = 0 SCHEMA = Schema( @@ -101,13 +101,13 @@ class OffsetFetchResponse_v0(Struct): ) -class OffsetFetchResponse_v1(Struct): +class OffsetFetchResponse_v1(Response): API_KEY = 9 API_VERSION = 1 SCHEMA = OffsetFetchResponse_v0.SCHEMA -class OffsetFetchResponse_v2(Struct): +class OffsetFetchResponse_v2(Response): # Added in KIP-88 API_KEY = 9 API_VERSION = 2 @@ -123,7 +123,7 @@ class OffsetFetchResponse_v2(Struct): ) -class OffsetFetchRequest_v0(Struct): +class OffsetFetchRequest_v0(Request): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage RESPONSE_TYPE = OffsetFetchResponse_v0 @@ -135,14 +135,14 @@ class OffsetFetchRequest_v0(Struct): ) -class OffsetFetchRequest_v1(Struct): +class OffsetFetchRequest_v1(Request): API_KEY = 9 API_VERSION = 1 # kafka-backed storage RESPONSE_TYPE = OffsetFetchResponse_v1 SCHEMA = OffsetFetchRequest_v0.SCHEMA -class OffsetFetchRequest_v2(Struct): +class OffsetFetchRequest_v2(Request): # KIP-88: Allows passing null topics to return offsets for all partitions # that the consumer group has a stored offset for, even if no consumer in # the group is currently consuming that partition. @@ -158,7 +158,7 @@ OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1, OffsetFetchResponse_v2] -class GroupCoordinatorResponse_v0(Struct): +class GroupCoordinatorResponse_v0(Response): API_KEY = 10 API_VERSION = 0 SCHEMA = Schema( @@ -169,7 +169,7 @@ class GroupCoordinatorResponse_v0(Struct): ) -class GroupCoordinatorRequest_v0(Struct): +class GroupCoordinatorRequest_v0(Request): API_KEY = 10 API_VERSION = 0 RESPONSE_TYPE = GroupCoordinatorResponse_v0 diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 6a9ad5b..b441e63 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -1,11 +1,11 @@ from __future__ import absolute_import +from .api import Request, Response from .message import MessageSet -from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class FetchResponse_v0(Struct): +class FetchResponse_v0(Response): API_KEY = 1 API_VERSION = 0 SCHEMA = Schema( @@ -19,7 +19,7 @@ class FetchResponse_v0(Struct): ) -class FetchResponse_v1(Struct): +class FetchResponse_v1(Response): API_KEY = 1 API_VERSION = 1 SCHEMA = Schema( @@ -34,19 +34,19 @@ class FetchResponse_v1(Struct): ) -class FetchResponse_v2(Struct): +class FetchResponse_v2(Response): API_KEY = 1 API_VERSION = 2 SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally -class FetchResponse_v3(Struct): +class FetchResponse_v3(Response): API_KEY = 1 API_VERSION = 3 SCHEMA = FetchResponse_v2.SCHEMA -class FetchRequest_v0(Struct): +class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 RESPONSE_TYPE = FetchResponse_v0 @@ -63,21 +63,21 @@ class FetchRequest_v0(Struct): ) -class FetchRequest_v1(Struct): +class FetchRequest_v1(Request): API_KEY = 1 API_VERSION = 1 RESPONSE_TYPE = FetchResponse_v1 SCHEMA = FetchRequest_v0.SCHEMA -class FetchRequest_v2(Struct): +class FetchRequest_v2(Request): API_KEY = 1 API_VERSION = 2 RESPONSE_TYPE = FetchResponse_v2 SCHEMA = FetchRequest_v1.SCHEMA -class FetchRequest_v3(Struct): +class FetchRequest_v3(Request): API_KEY = 1 API_VERSION = 3 RESPONSE_TYPE = FetchResponse_v3 diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 0e0b70e..5cab754 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -1,10 +1,11 @@ from __future__ import absolute_import +from .api import Request, Response from .struct import Struct from .types import Array, Bytes, Int16, Int32, Schema, String -class JoinGroupResponse_v0(Struct): +class JoinGroupResponse_v0(Response): API_KEY = 11 API_VERSION = 0 SCHEMA = Schema( @@ -19,13 +20,13 @@ class JoinGroupResponse_v0(Struct): ) -class JoinGroupResponse_v1(Struct): +class JoinGroupResponse_v1(Response): API_KEY = 11 API_VERSION = 1 SCHEMA = JoinGroupResponse_v0.SCHEMA -class JoinGroupRequest_v0(Struct): +class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 RESPONSE_TYPE = JoinGroupResponse_v0 @@ -41,7 +42,7 @@ class JoinGroupRequest_v0(Struct): UNKNOWN_MEMBER_ID = '' -class JoinGroupRequest_v1(Struct): +class JoinGroupRequest_v1(Request): API_KEY = 11 API_VERSION = 1 RESPONSE_TYPE = JoinGroupResponse_v1 @@ -70,7 +71,7 @@ class ProtocolMetadata(Struct): ) -class SyncGroupResponse_v0(Struct): +class SyncGroupResponse_v0(Response): API_KEY = 14 API_VERSION = 0 SCHEMA = Schema( @@ -79,7 +80,7 @@ class SyncGroupResponse_v0(Struct): ) -class SyncGroupRequest_v0(Struct): +class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 RESPONSE_TYPE = SyncGroupResponse_v0 @@ -107,7 +108,7 @@ class MemberAssignment(Struct): ) -class HeartbeatResponse_v0(Struct): +class HeartbeatResponse_v0(Response): API_KEY = 12 API_VERSION = 0 SCHEMA = Schema( @@ -115,7 +116,7 @@ class HeartbeatResponse_v0(Struct): ) -class HeartbeatRequest_v0(Struct): +class HeartbeatRequest_v0(Request): API_KEY = 12 API_VERSION = 0 RESPONSE_TYPE = HeartbeatResponse_v0 @@ -130,7 +131,7 @@ HeartbeatRequest = [HeartbeatRequest_v0] HeartbeatResponse = [HeartbeatResponse_v0] -class LeaveGroupResponse_v0(Struct): +class LeaveGroupResponse_v0(Response): API_KEY = 13 API_VERSION = 0 SCHEMA = Schema( @@ -138,7 +139,7 @@ class LeaveGroupResponse_v0(Struct): ) -class LeaveGroupRequest_v0(Struct): +class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 RESPONSE_TYPE = LeaveGroupResponse_v0 diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index e017c59..907ec25 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -1,10 +1,10 @@ from __future__ import absolute_import -from .struct import Struct +from .api import Request, Response from .types import Array, Boolean, Int16, Int32, Schema, String -class MetadataResponse_v0(Struct): +class MetadataResponse_v0(Response): API_KEY = 3 API_VERSION = 0 SCHEMA = Schema( @@ -24,7 +24,7 @@ class MetadataResponse_v0(Struct): ) -class MetadataResponse_v1(Struct): +class MetadataResponse_v1(Response): API_KEY = 3 API_VERSION = 1 SCHEMA = Schema( @@ -47,7 +47,7 @@ class MetadataResponse_v1(Struct): ) -class MetadataResponse_v2(Struct): +class MetadataResponse_v2(Response): API_KEY = 3 API_VERSION = 2 SCHEMA = Schema( @@ -71,7 +71,7 @@ class MetadataResponse_v2(Struct): ) -class MetadataRequest_v0(Struct): +class MetadataRequest_v0(Request): API_KEY = 3 API_VERSION = 0 RESPONSE_TYPE = MetadataResponse_v0 @@ -81,7 +81,7 @@ class MetadataRequest_v0(Struct): ALL_TOPICS = None # Empty Array (len 0) for topics returns all topics -class MetadataRequest_v1(Struct): +class MetadataRequest_v1(Request): API_KEY = 3 API_VERSION = 1 RESPONSE_TYPE = MetadataResponse_v1 @@ -90,7 +90,7 @@ class MetadataRequest_v1(Struct): NO_TOPICS = None # Empty array (len 0) for topics returns no topics -class MetadataRequest_v2(Struct): +class MetadataRequest_v2(Request): API_KEY = 3 API_VERSION = 2 RESPONSE_TYPE = MetadataResponse_v2 diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 5182d63..588dfec 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -from .struct import Struct +from .api import Request, Response from .types import Array, Int16, Int32, Int64, Schema, String @@ -10,7 +10,7 @@ class OffsetResetStrategy(object): NONE = 0 -class OffsetResponse_v0(Struct): +class OffsetResponse_v0(Response): API_KEY = 2 API_VERSION = 0 SCHEMA = Schema( @@ -22,7 +22,7 @@ class OffsetResponse_v0(Struct): ('offsets', Array(Int64)))))) ) -class OffsetResponse_v1(Struct): +class OffsetResponse_v1(Response): API_KEY = 2 API_VERSION = 1 SCHEMA = Schema( @@ -36,7 +36,7 @@ class OffsetResponse_v1(Struct): ) -class OffsetRequest_v0(Struct): +class OffsetRequest_v0(Request): API_KEY = 2 API_VERSION = 0 RESPONSE_TYPE = OffsetResponse_v0 @@ -53,7 +53,7 @@ class OffsetRequest_v0(Struct): 'replica_id': -1 } -class OffsetRequest_v1(Struct): +class OffsetRequest_v1(Request): API_KEY = 2 API_VERSION = 1 RESPONSE_TYPE = OffsetResponse_v1 diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index c1a519e..9b03354 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -1,11 +1,11 @@ from __future__ import absolute_import +from .api import Request, Response from .message import MessageSet -from .struct import Struct from .types import Int16, Int32, Int64, String, Array, Schema -class ProduceResponse_v0(Struct): +class ProduceResponse_v0(Response): API_KEY = 0 API_VERSION = 0 SCHEMA = Schema( @@ -18,7 +18,7 @@ class ProduceResponse_v0(Struct): ) -class ProduceResponse_v1(Struct): +class ProduceResponse_v1(Response): API_KEY = 0 API_VERSION = 1 SCHEMA = Schema( @@ -32,7 +32,7 @@ class ProduceResponse_v1(Struct): ) -class ProduceResponse_v2(Struct): +class ProduceResponse_v2(Response): API_KEY = 0 API_VERSION = 2 SCHEMA = Schema( @@ -47,7 +47,7 @@ class ProduceResponse_v2(Struct): ) -class ProduceRequest_v0(Struct): +class ProduceRequest_v0(Request): API_KEY = 0 API_VERSION = 0 RESPONSE_TYPE = ProduceResponse_v0 @@ -61,20 +61,35 @@ class ProduceRequest_v0(Struct): ('messages', MessageSet))))) ) + def expect_response(self): + if self.required_acks == 0: # pylint: disable=no-member + return False + return True -class ProduceRequest_v1(Struct): + +class ProduceRequest_v1(Request): API_KEY = 0 API_VERSION = 1 RESPONSE_TYPE = ProduceResponse_v1 SCHEMA = ProduceRequest_v0.SCHEMA + def expect_response(self): + if self.required_acks == 0: # pylint: disable=no-member + return False + return True -class ProduceRequest_v2(Struct): + +class ProduceRequest_v2(Request): API_KEY = 0 API_VERSION = 2 RESPONSE_TYPE = ProduceResponse_v2 SCHEMA = ProduceRequest_v1.SCHEMA + def expect_response(self): + if self.required_acks == 0: # pylint: disable=no-member + return False + return True + ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2] ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2] diff --git a/test/test_client_async.py b/test/test_client_async.py index 8874c67..97be827 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -236,13 +236,14 @@ def test_send(cli, conn): cli._maybe_connect(0) # ProduceRequest w/ 0 required_acks -> no response request = ProduceRequest[0](0, 0, []) + assert request.expect_response() is False ret = cli.send(0, request) - assert conn.send.called_with(request, expect_response=False) + assert conn.send.called_with(request) assert isinstance(ret, Future) request = MetadataRequest[0]([]) cli.send(0, request) - assert conn.send.called_with(request, expect_response=True) + assert conn.send.called_with(request) def test_poll(mocker): diff --git a/test/test_conn.py b/test/test_conn.py index 248ab88..2c418d4 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -11,6 +11,7 @@ import pytest from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts from kafka.protocol.api import RequestHeader from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.produce import ProduceRequest import kafka.common as Errors @@ -112,7 +113,7 @@ def test_send_max_ifr(conn): def test_send_no_response(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest[0]([]) + req = ProduceRequest[0](required_acks=0, timeout=0, topics=[]) header = RequestHeader(req, client_id=conn.config['client_id']) payload_bytes = len(header.encode()) + len(req.encode()) third = payload_bytes // 3 @@ -120,7 +121,7 @@ def test_send_no_response(_socket, conn): _socket.send.side_effect = [4, third, third, third, remainder] assert len(conn.in_flight_requests) == 0 - f = conn.send(req, expect_response=False) + f = conn.send(req) assert f.succeeded() is True assert f.value is None assert len(conn.in_flight_requests) == 0