Make _wake_r socket non-blocking; drop select from _clear_wake_fd
This commit is contained in:
@@ -98,6 +98,7 @@ class KafkaClient(object):
|
|||||||
self._bootstrap_fails = 0
|
self._bootstrap_fails = 0
|
||||||
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
||||||
self._wake_r, self._wake_w = socket.socketpair()
|
self._wake_r, self._wake_w = socket.socketpair()
|
||||||
|
self._wake_r.setblocking(False)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self._wake_r.close()
|
self._wake_r.close()
|
||||||
@@ -682,10 +683,10 @@ class KafkaClient(object):
|
|||||||
|
|
||||||
def _clear_wake_fd(self):
|
def _clear_wake_fd(self):
|
||||||
while True:
|
while True:
|
||||||
fds, _, _ = select.select([self._wake_r], [], [], 0)
|
try:
|
||||||
if not fds:
|
self._wake_r.recv(1)
|
||||||
|
except:
|
||||||
break
|
break
|
||||||
self._wake_r.recv(1)
|
|
||||||
|
|
||||||
|
|
||||||
class DelayedTaskQueue(object):
|
class DelayedTaskQueue(object):
|
||||||
|
|||||||
Reference in New Issue
Block a user