implement sasl PLAIN mechanism

This commit is contained in:
Lars Jørgen Solberg
2016-07-25 08:40:38 +00:00
committed by Dana Powers
parent c693709aaf
commit 6b801a8d2e
6 changed files with 176 additions and 4 deletions

View File

@@ -70,6 +70,9 @@ class KafkaClient(object):
'selector': selectors.DefaultSelector, 'selector': selectors.DefaultSelector,
'metrics': None, 'metrics': None,
'metric_group_prefix': '', 'metric_group_prefix': '',
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
} }
API_VERSIONS = [ API_VERSIONS = [
(0, 10), (0, 10),
@@ -150,6 +153,13 @@ class KafkaClient(object):
metrics (kafka.metrics.Metrics): Optionally provide a metrics metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None. instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: '' metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): string picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
Defualt: None
""" """
self.config = copy.copy(self.DEFAULT_CONFIG) self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config: for key in self.config:

View File

@@ -15,6 +15,7 @@ from kafka.vendor import six
import kafka.errors as Errors import kafka.errors as Errors
from kafka.future import Future from kafka.future import Future
from kafka.protocol.api import RequestHeader from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse
from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32 from kafka.protocol.types import Int32
from kafka.version import __version__ from kafka.version import __version__
@@ -48,7 +49,7 @@ class ConnectionStates(object):
CONNECTING = '<connecting>' CONNECTING = '<connecting>'
HANDSHAKE = '<handshake>' HANDSHAKE = '<handshake>'
CONNECTED = '<connected>' CONNECTED = '<connected>'
AUTHENTICATING = '<authenticating>'
InFlightRequest = collections.namedtuple('InFlightRequest', InFlightRequest = collections.namedtuple('InFlightRequest',
['request', 'response_type', 'correlation_id', 'future', 'timestamp']) ['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
@@ -73,6 +74,9 @@ class BrokerConnection(object):
'ssl_password': None, 'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive 'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True, 'state_change_callback': lambda conn: True,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None
} }
def __init__(self, host, port, afi, **configs): def __init__(self, host, port, afi, **configs):
@@ -188,6 +192,8 @@ class BrokerConnection(object):
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', str(self)) log.debug('%s: initiating SSL handshake', str(self))
self.state = ConnectionStates.HANDSHAKE self.state = ConnectionStates.HANDSHAKE
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
self.state = ConnectionStates.AUTHENTICATING
else: else:
self.state = ConnectionStates.CONNECTED self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self) self.config['state_change_callback'](self)
@@ -211,6 +217,15 @@ class BrokerConnection(object):
if self.state is ConnectionStates.HANDSHAKE: if self.state is ConnectionStates.HANDSHAKE:
if self._try_handshake(): if self._try_handshake():
log.debug('%s: completed SSL handshake.', str(self)) log.debug('%s: completed SSL handshake.', str(self))
if self.config['security_protocol'] == 'SASL_SSL':
self.state = ConnectionStates.AUTHENTICATING
else:
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
if self.state is ConnectionStates.AUTHENTICATING:
if self._try_authenticate():
log.debug('%s: Authenticated as %s', str(self), self.config['sasl_plain_username'])
self.state = ConnectionStates.CONNECTED self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self) self.config['state_change_callback'](self)
@@ -273,6 +288,90 @@ class BrokerConnection(object):
return False return False
def _try_authenticate(self):
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.warning('%s: Sending username and password in the clear', str(self))
# Build a SaslHandShakeRequest message
correlation_id = self._next_correlation_id()
request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
header = RequestHeader(request,
correlation_id=correlation_id,
client_id=self.config['client_id'])
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
# Attempt to send it over our socket
try:
self._sock.setblocking(True)
self._sock.sendall(size + message)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError("%s: %s" % (str(self), e))
self.close(error=error)
return False
future = Future()
ifr = InFlightRequest(request=request,
correlation_id=correlation_id,
response_type=request.RESPONSE_TYPE,
future=future,
timestamp=time.time())
self.in_flight_requests.append(ifr)
# Listen for a reply and check that the server supports the PLAIN mechanism
response = None
while not response:
response = self.recv()
if not response.error_code is 0:
raise Errors.for_code(response.error_code)
if not self.config['sasl_mechanism'] in response.enabled_mechanisms:
raise Errors.AuthenticationMethodNotSupported(self.config['sasl_mechanism'] + " is not supported by broker")
return self._try_authenticate_plain()
def _try_authenticate_plain(self):
data = b''
try:
self._sock.setblocking(True)
# Send our credentials
msg = bytes('\0'.join([self.config['sasl_plain_username'],
self.config['sasl_plain_username'],
self.config['sasl_plain_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
received_bytes = 0
while received_bytes < 4:
data = data + self._sock.recv(4 - received_bytes)
received_bytes = received_bytes + len(data)
if not data:
log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
self.close(error=Errors.ConnectionError('Authentication failed'))
raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_plain_username']))
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (str(self), e))
self.close(error=error)
return False
with io.BytesIO() as buffer:
buffer.write(data)
buffer.seek(0)
if not Int32.decode(buffer) == 0:
raise Errors.KafkaError('Expected a zero sized reply after sending credentials')
return True
def blacked_out(self): def blacked_out(self):
""" """
Return true if we are disconnected from the given node and can't Return true if we are disconnected from the given node and can't
@@ -292,7 +391,8 @@ class BrokerConnection(object):
"""Returns True if still connecting (this may encompass several """Returns True if still connecting (this may encompass several
different states, such as SSL handshake, authorization, etc).""" different states, such as SSL handshake, authorization, etc)."""
return self.state in (ConnectionStates.CONNECTING, return self.state in (ConnectionStates.CONNECTING,
ConnectionStates.HANDSHAKE) ConnectionStates.HANDSHAKE,
ConnectionStates.AUTHENTICATING)
def disconnected(self): def disconnected(self):
"""Return True iff socket is closed""" """Return True iff socket is closed"""
@@ -385,7 +485,7 @@ class BrokerConnection(object):
Return response if available Return response if available
""" """
assert not self._processing, 'Recursion not supported' assert not self._processing, 'Recursion not supported'
if not self.connected(): if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
log.warning('%s cannot recv: socket not connected', self) log.warning('%s cannot recv: socket not connected', self)
# If requests are pending, we should close the socket and # If requests are pending, we should close the socket and
# fail all the pending request futures # fail all the pending request futures

View File

@@ -186,6 +186,13 @@ class KafkaConsumer(six.Iterator):
(such as offsets) should be exposed to the consumer. If set to True (such as offsets) should be exposed to the consumer. If set to True
the only way to receive records from an internal topic is the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+ Default: True subscribing to it. Requires 0.10+ Default: True
sasl_mechanism (str): string picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
Defualt: None
Note: Note:
Configuration parameters are described in more detail at Configuration parameters are described in more detail at
@@ -234,6 +241,9 @@ class KafkaConsumer(six.Iterator):
'metrics_sample_window_ms': 30000, 'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector, 'selector': selectors.DefaultSelector,
'exclude_internal_topics': True, 'exclude_internal_topics': True,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
} }
def __init__(self, *topics, **configs): def __init__(self, *topics, **configs):

View File

@@ -58,6 +58,14 @@ class CommitFailedError(KafkaError):
pass pass
class AuthenticationMethodNotSupported(KafkaError):
pass
class AuthenticationFailedError(KafkaError):
retriable = False
class BrokerResponseError(KafkaError): class BrokerResponseError(KafkaError):
errno = None errno = None
message = None message = None
@@ -328,6 +336,18 @@ class InvalidTimestampError(BrokerResponseError):
description = ('The timestamp of the message is out of acceptable range.') description = ('The timestamp of the message is out of acceptable range.')
class UnsupportedSaslMechanismError(BrokerResponseError):
errno = 33
message = 'UNSUPPORTED_SASL_MECHANISM'
description = ('The broker does not support the requested SASL mechanism.')
class IllegalSaslStateError(BrokerResponseError):
errno = 34
message = 'ILLEGAL_SASL_STATE'
description = ('Request is not valid given the current SASL state.')
class KafkaUnavailableError(KafkaError): class KafkaUnavailableError(KafkaError):
pass pass

View File

@@ -199,7 +199,8 @@ class KafkaProducer(object):
to kafka brokers up to this number of maximum requests per to kafka brokers up to this number of maximum requests per
broker connection. Default: 5. broker connection. Default: 5.
security_protocol (str): Protocol used to communicate with brokers. security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None. will be ignored. Default: None.
@@ -235,6 +236,13 @@ class KafkaProducer(object):
selector (selectors.BaseSelector): Provide a specific selector selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing. implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector Default: selectors.DefaultSelector
sasl_mechanism (str): string picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
Defualt: None
Note: Note:
Configuration parameters are described in more detail at Configuration parameters are described in more detail at
@@ -276,6 +284,9 @@ class KafkaProducer(object):
'metrics_num_samples': 2, 'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000, 'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector, 'selector': selectors.DefaultSelector,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
} }
def __init__(self, **configs): def __init__(self, **configs):

View File

@@ -78,3 +78,24 @@ class DescribeGroupsRequest_v0(Struct):
DescribeGroupsRequest = [DescribeGroupsRequest_v0] DescribeGroupsRequest = [DescribeGroupsRequest_v0]
DescribeGroupsResponse = [DescribeGroupsResponse_v0] DescribeGroupsResponse = [DescribeGroupsResponse_v0]
class SaslHandShakeResponse_v0(Struct):
API_KEY = 17
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('enabled_mechanisms', Array(String('utf-8')))
)
class SaslHandShakeRequest_v0(Struct):
API_KEY = 17
API_VERSION = 0
RESPONSE_TYPE = SaslHandShakeResponse_v0
SCHEMA = Schema(
('mechanism', String('utf-8'))
)
SaslHandShakeRequest = [SaslHandShakeRequest_v0]
SaslHandShakeResponse = [SaslHandShakeResponse_v0]