Use socketpair for KafkaClient wake pipe windows compatibility

This commit is contained in:
Dana Powers
2016-03-17 11:04:01 -07:00
parent 82c3e371c9
commit 16a013e207
2 changed files with 65 additions and 6 deletions

View File

@@ -4,9 +4,9 @@ import copy
import heapq import heapq
import itertools import itertools
import logging import logging
import os
import random import random
import select import select
import socket
import time import time
import six import six
@@ -18,6 +18,7 @@ from .conn import BrokerConnection, ConnectionStates, collect_hosts
from .future import Future from .future import Future
from .protocol.metadata import MetadataRequest from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest from .protocol.produce import ProduceRequest
from . import socketpair
from .version import __version__ from .version import __version__
if six.PY2: if six.PY2:
@@ -97,11 +98,11 @@ class KafkaClient(object):
self._last_bootstrap = 0 self._last_bootstrap = 0
self._bootstrap_fails = 0 self._bootstrap_fails = 0
self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = os.pipe() self._wake_r, self._wake_w = socket.socketpair()
def __del__(self): def __del__(self):
os.close(self._wake_r) self._wake_r.close()
os.close(self._wake_w) self._wake_w.close()
def _bootstrap(self, hosts): def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails # Exponential backoff if bootstrap fails
@@ -674,14 +675,15 @@ class KafkaClient(object):
return version return version
def wakeup(self): def wakeup(self):
os.write(self._wake_w, b'x') if self._wake_w.send(b'x') != 1:
log.warning('Unable to send to wakeup socket!')
def _clear_wake_fd(self): def _clear_wake_fd(self):
while True: while True:
fds, _, _ = select.select([self._wake_r], [], [], 0) fds, _, _ = select.select([self._wake_r], [], [], 0)
if not fds: if not fds:
break break
os.read(self._wake_r, 1) self._wake_r.recv(1)
class DelayedTaskQueue(object): class DelayedTaskQueue(object):

57
kafka/socketpair.py Normal file
View File

@@ -0,0 +1,57 @@
# pylint: skip-file
# vendored from https://github.com/mhils/backports.socketpair
import sys
import socket
import errno
_LOCALHOST = '127.0.0.1'
_LOCALHOST_V6 = '::1'
if not hasattr(socket, "socketpair"):
# Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
if family == socket.AF_INET:
host = _LOCALHOST
elif family == socket.AF_INET6:
host = _LOCALHOST_V6
else:
raise ValueError("Only AF_INET and AF_INET6 socket address families "
"are supported")
if type != socket.SOCK_STREAM:
raise ValueError("Only SOCK_STREAM socket type is supported")
if proto != 0:
raise ValueError("Only protocol zero is supported")
# We create a connected TCP socket. Note the trick with
# setblocking(False) that prevents us from having to create a thread.
lsock = socket.socket(family, type, proto)
try:
lsock.bind((host, 0))
lsock.listen(min(socket.SOMAXCONN, 128))
# On IPv6, ignore flow_info and scope_id
addr, port = lsock.getsockname()[:2]
csock = socket.socket(family, type, proto)
try:
csock.setblocking(False)
if sys.version_info >= (3, 0):
try:
csock.connect((addr, port))
except (BlockingIOError, InterruptedError):
pass
else:
try:
csock.connect((addr, port))
except socket.error as e:
if e.errno != errno.WSAEWOULDBLOCK:
raise
csock.setblocking(True)
ssock, _ = lsock.accept()
except:
csock.close()
raise
finally:
lsock.close()
return (ssock, csock)
socket.socketpair = socketpair