Merge pull request #621 from dpkp/ssl_support

Support SSL connections
This commit is contained in:
Dana Powers
2016-04-09 10:29:08 -07:00
10 changed files with 216 additions and 16 deletions

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ dist
MANIFEST
env
servers/*/kafka-bin*
servers/*/resources/ssl*
.coverage*
.noseids
docs/_build

View File

@@ -53,6 +53,12 @@ class KafkaClient(object):
'send_buffer_bytes': None,
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
}
def __init__(self, **configs):
@@ -90,6 +96,21 @@ class KafkaClient(object):
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: true.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: none.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: none.
ssl_keyfile (str): optional filename containing the client private key.
default: none.
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -168,8 +189,10 @@ class KafkaClient(object):
def _conn_state_change(self, node_id, conn):
if conn.connecting():
self._connecting.add(node_id)
self._selector.register(conn._sock, selectors.EVENT_WRITE)
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
self._selector.register(conn._sock, selectors.EVENT_WRITE)
elif conn.connected():
log.debug("Node %s connected", node_id)
@@ -412,7 +435,9 @@ class KafkaClient(object):
def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
assert self.in_flight_request_count() > 0 or self._connecting or sleep
responses = []
processed = set()
for key, events in self._selector.select(timeout):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
@@ -420,6 +445,7 @@ class KafkaClient(object):
elif not (events & selectors.EVENT_READ):
continue
conn = key.data
processed.add(conn)
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -428,6 +454,15 @@ class KafkaClient(object):
if not response:
break
responses.append(response)
# Check for additional pending SSL bytes
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
# TODO: optimize
for conn in self._conns.values():
if conn not in processed and conn.connected() and conn._sock.pending():
response = conn.recv()
if response:
responses.append(response)
return responses
def in_flight_request_count(self, node_id=None):

View File

@@ -5,6 +5,7 @@ import logging
import io
from random import shuffle
import socket
import ssl
import struct
from threading import local
import time
@@ -29,11 +30,25 @@ log = logging.getLogger(__name__)
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
# support older ssl libraries
try:
assert ssl.SSLWantReadError
assert ssl.SSLWantWriteError
assert ssl.SSLZeroReturnError
except:
log.warning('old ssl module detected.'
' ssl error handling may not operate cleanly.'
' Consider upgrading to python 3.5 or 2.7')
ssl.SSLWantReadError = ssl.SSLError
ssl.SSLWantWriteError = ssl.SSLError
ssl.SSLZeroReturnError = ssl.SSLError
class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
HANDSHAKE = '<handshake>'
CONNECTED = '<connected>'
@@ -49,6 +64,12 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
}
@@ -66,6 +87,9 @@ class BrokerConnection(object):
self.state = ConnectionStates.DISCONNECTED
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
self._rbuffer = io.BytesIO()
self._receiving = False
self._next_payload_bytes = 0
@@ -87,6 +111,8 @@ class BrokerConnection(object):
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes'])
self._sock.setblocking(False)
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
self.config['state_change_callback'](self)
@@ -103,7 +129,11 @@ class BrokerConnection(object):
# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', str(self))
self.state = ConnectionStates.CONNECTED
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', str(self))
self.state = ConnectionStates.HANDSHAKE
else:
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
# Connection failed
@@ -122,8 +152,60 @@ class BrokerConnection(object):
else:
pass
if self.state is ConnectionStates.HANDSHAKE:
if self._try_handshake():
log.debug('%s: completed SSL handshake.', str(self))
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
return self.state
def _wrap_ssl(self):
assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
if self._ssl_context is None:
log.debug('%s: configuring default SSL Context', str(self))
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member
self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member
self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member
self._ssl_context.verify_mode = ssl.CERT_OPTIONAL
if self.config['ssl_check_hostname']:
self._ssl_context.check_hostname = True
if self.config['ssl_cafile']:
log.info('%s: Loading SSL CA from %s', str(self), self.config['ssl_cafile'])
self._ssl_context.load_verify_locations(self.config['ssl_cafile'])
self._ssl_context.verify_mode = ssl.CERT_REQUIRED
if self.config['ssl_certfile'] and self.config['ssl_keyfile']:
log.info('%s: Loading SSL Cert from %s', str(self), self.config['ssl_certfile'])
log.info('%s: Loading SSL Key from %s', str(self), self.config['ssl_keyfile'])
self._ssl_context.load_cert_chain(
certfile=self.config['ssl_certfile'],
keyfile=self.config['ssl_keyfile'])
log.debug('%s: wrapping socket in ssl context', str(self))
try:
self._sock = self._ssl_context.wrap_socket(
self._sock,
server_hostname=self.host,
do_handshake_on_connect=False)
except ssl.SSLError:
log.exception('%s: Failed to wrap socket in SSLContext!', str(self))
self.close()
self.last_failure = time.time()
def _try_handshake(self):
assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
try:
self._sock.do_handshake()
return True
# old ssl in python2.6 will swallow all SSLErrors here...
except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
pass
except ssl.SSLZeroReturnError:
log.warning('SSL connection closed by server during handshake.')
self.close()
# Other SSLErrors will be raised to user
return False
def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
@@ -140,8 +222,10 @@ class BrokerConnection(object):
return self.state is ConnectionStates.CONNECTED
def connecting(self):
"""Return True iff socket is in intermediate connecting state."""
return self.state is ConnectionStates.CONNECTING
"""Returns True if still connecting (this may encompass several
different states, such as SSL handshake, authorization, etc)."""
return self.state in (ConnectionStates.CONNECTING,
ConnectionStates.HANDSHAKE)
def disconnected(self):
"""Return True iff socket is closed"""
@@ -260,6 +344,8 @@ class BrokerConnection(object):
# An extremely small, but non-zero, probability that there are
# more than 0 but not yet 4 bytes available to read
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
except ssl.SSLWantReadError:
return None
except ConnectionError as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
return None
@@ -286,6 +372,8 @@ class BrokerConnection(object):
staged_bytes = self._rbuffer.tell()
try:
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
except ssl.SSLWantReadError:
return None
except ConnectionError as e:
# Extremely small chance that we have exactly 4 bytes for a
# header, but nothing to read in the body yet

View File

@@ -122,6 +122,21 @@ class KafkaConsumer(six.Iterator):
consumer_timeout_ms (int): number of millisecond to throw a timeout
exception to the consumer if no message is available for
consumption. Default: -1 (dont throw exception)
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: true.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: none.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: none.
ssl_keyfile (str): optional filename containing the client private key.
default: none.
api_version (str): specify which kafka API version to use.
0.9 enables full group coordination features; 0.8.2 enables
kafka-storage offset commits; 0.8.1 enables zookeeper-storage
@@ -158,6 +173,12 @@ class KafkaConsumer(six.Iterator):
'send_buffer_bytes': None,
'receive_buffer_bytes': None,
'consumer_timeout_ms': -1,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
#'metric_reporters': None,

View File

@@ -192,6 +192,21 @@ class KafkaProducer(object):
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: true.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: none.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: none.
ssl_keyfile (str): optional filename containing the client private key.
default: none.
api_version (str): specify which kafka API version to use.
If set to 'auto', will attempt to infer the broker version by
probing various APIs. Default: auto
@@ -222,6 +237,12 @@ class KafkaProducer(object):
'send_buffer_bytes': None,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'api_version': 'auto',
}

View File

@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar
# The port the socket server listens on
port={port}
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from

View File

@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar
# The port the socket server listens on
port={port}
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from

View File

@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar
# The port the socket server listens on
port={port}
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from

View File

@@ -51,7 +51,8 @@ def conn(mocker):
return state
conn._set_conn_state = _set_conn_state
conn.connect.side_effect = lambda: conn.state
conn.connecting = lambda: conn.state is ConnectionStates.CONNECTING
conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING,
ConnectionStates.HANDSHAKE)
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn

View File

@@ -182,8 +182,8 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
@classmethod
def instance(cls, broker_id, zk_host, zk_port,
zk_chroot=None, port=None, replicas=1, partitions=2):
def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, port=None,
transport='PLAINTEXT', replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -194,16 +194,21 @@ class KafkaFixture(Fixture):
if port is None:
port = get_open_port()
host = "127.0.0.1"
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot,
fixture = KafkaFixture(host, port, broker_id,
zk_host, zk_port, zk_chroot,
transport=transport,
replicas=replicas, partitions=partitions)
fixture.open()
return fixture
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot,
replicas=1, partitions=2, transport='PLAINTEXT'):
self.host = host
self.port = port
self.broker_id = broker_id
self.transport = transport.upper()
self.ssl_dir = self.test_resource('ssl')
self.zk_host = zk_host
self.zk_port = zk_port
@@ -233,6 +238,7 @@ class KafkaFixture(Fixture):
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" transport = %s", self.transport)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)