Fix possible request draining in ensure_active_group (#896)

This commit is contained in:
Dana Powers
2016-12-02 18:25:28 -08:00
committed by GitHub
parent 9b59c5d755
commit 010ebb53a9
2 changed files with 16 additions and 12 deletions

View File

@@ -453,7 +453,7 @@ class KafkaClient(object):
return self._conns[node_id].send(request, expect_response=expect_response) return self._conns[node_id].send(request, expect_response=expect_response)
def poll(self, timeout_ms=None, future=None, sleep=True): def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
"""Try to read and write to sockets. """Try to read and write to sockets.
This method will also attempt to complete node connections, refresh This method will also attempt to complete node connections, refresh
@@ -488,14 +488,15 @@ class KafkaClient(object):
metadata_timeout_ms = self._maybe_refresh_metadata() metadata_timeout_ms = self._maybe_refresh_metadata()
# Send scheduled tasks # Send scheduled tasks
for task, task_future in self._delayed_tasks.pop_ready(): if delayed_tasks:
try: for task, task_future in self._delayed_tasks.pop_ready():
result = task() try:
except Exception as e: result = task()
log.error("Task %s failed: %s", task, e) except Exception as e:
task_future.failure(e) log.error("Task %s failed: %s", task, e)
else: task_future.failure(e)
task_future.success(result) else:
task_future.success(result)
# If we got a future that is already done, don't block in _poll # If we got a future that is already done, don't block in _poll
if future and future.is_done: if future and future.is_done:

View File

@@ -246,9 +246,12 @@ class BaseCoordinator(object):
# This is important in particular to avoid resending a pending # This is important in particular to avoid resending a pending
# JoinGroup request. # JoinGroup request.
if self._client.in_flight_request_count(self.coordinator_id): if self._client.in_flight_request_count(self.coordinator_id):
while self._client.in_flight_request_count(self.coordinator_id): while not self.coordinator_unknown():
self._client.poll() self._client.poll(delayed_tasks=False)
continue if not self._client.in_flight_request_count(self.coordinator_id):
break
else:
continue
future = self._send_join_group_request() future = self._send_join_group_request()
self._client.poll(future=future) self._client.poll(future=future)