implement sasl PLAIN mechanism

This commit is contained in:
Lars Jørgen Solberg
2016-07-25 08:40:38 +00:00
committed by Dana Powers
parent c693709aaf
commit 191a45a01e
5 changed files with 155 additions and 5 deletions

View File

@@ -70,6 +70,9 @@ class KafkaClient(object):
'selector': selectors.DefaultSelector,
'metrics': None,
'metric_group_prefix': '',
'sasl_mechanism': None,
'sasl_username': None,
'sasl_password': None,
}
API_VERSIONS = [
(0, 10),

View File

@@ -15,6 +15,7 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -48,7 +49,7 @@ class ConnectionStates(object):
CONNECTING = '<connecting>'
HANDSHAKE = '<handshake>'
CONNECTED = '<connected>'
AUTHENTICATING = '<authenticating>'
InFlightRequest = collections.namedtuple('InFlightRequest',
['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
@@ -73,6 +74,9 @@ class BrokerConnection(object):
'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
'sasl_mechanism': None,
'sasl_username': None,
'sasl_password': None
}
def __init__(self, host, port, afi, **configs):
@@ -188,6 +192,8 @@ class BrokerConnection(object):
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', str(self))
self.state = ConnectionStates.HANDSHAKE
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
self.state = ConnectionStates.AUTHENTICATING
else:
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
@@ -211,6 +217,15 @@ class BrokerConnection(object):
if self.state is ConnectionStates.HANDSHAKE:
if self._try_handshake():
log.debug('%s: completed SSL handshake.', str(self))
if self.config['security_protocol'] == 'SASL_SSL':
self.state = ConnectionStates.AUTHENTICATING
else:
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
if self.state is ConnectionStates.AUTHENTICATING:
if self._try_authenticate():
log.debug('%s: Authenticated as %s', str(self), self.config['sasl_username'])
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
@@ -273,6 +288,90 @@ class BrokerConnection(object):
return False
def _try_authenticate(self):
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.warning('%s: Sending username and password in the clear', str(self))
# build a SaslHandShakeRequest message
correlation_id = self._next_correlation_id()
request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
header = RequestHeader(request,
correlation_id=correlation_id,
client_id=self.config['client_id'])
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
# attempt to send it over our socket
try:
self._sock.setblocking(True)
self._sock.sendall(size + message)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError("%s: %s" % (str(self), e))
self.close(error=error)
return False
future = Future()
ifr = InFlightRequest(request=request,
correlation_id=correlation_id,
response_type=request.RESPONSE_TYPE,
future=future,
timestamp=time.time())
self.in_flight_requests.append(ifr)
# listen for a reply and check that the server supports the PLAIN mechanism
response = None
while not response:
response = self.recv()
if not response.error_code is 0:
raise for_code(response.error_code)
if not self.config['sasl_mechanism'] in response.enabled_mechanisms:
raise AuthenticationMethodNotSupported(self.config['sasl_mechanism'] + " is not supported by broker")
return self._try_authenticate_plain()
def _try_authenticate_plain(self):
data = b''
try:
self._sock.setblocking(True)
# send our credentials
msg = bytes('\0'.join([self.config['sasl_username'],
self.config['sasl_username'],
self.config['sasl_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
received_bytes = 0
while received_bytes < 4:
data = data + self._sock.recv(4 - received_bytes)
received_bytes = received_bytes + len(data)
if not data:
log.error('%s: Authentication failed for user %s', self, self.config['sasl_username'])
self.close(error=Errors.ConnectionError('Authentication failed'))
raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_username']))
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (str(self), e))
self.close(error=error)
return False
with io.BytesIO() as buffer:
buffer.write(data)
buffer.seek(0)
if not Int32.decode(buffer) == 0:
raise KafkaError('Expected a zero sized reply after sending credentials')
return True
def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
@@ -292,7 +391,8 @@ class BrokerConnection(object):
"""Returns True if still connecting (this may encompass several
different states, such as SSL handshake, authorization, etc)."""
return self.state in (ConnectionStates.CONNECTING,
ConnectionStates.HANDSHAKE)
ConnectionStates.HANDSHAKE,
ConnectionStates.AUTHENTICATING)
def disconnected(self):
"""Return True iff socket is closed"""
@@ -385,7 +485,7 @@ class BrokerConnection(object):
Return response if available
"""
assert not self._processing, 'Recursion not supported'
if not self.connected():
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
log.warning('%s cannot recv: socket not connected', self)
# If requests are pending, we should close the socket and
# fail all the pending request futures
@@ -436,6 +536,7 @@ class BrokerConnection(object):
if self._rbuffer.tell() == 4:
self._rbuffer.seek(0)
self._next_payload_bytes = Int32.decode(self._rbuffer)
log.debug('_next_payload_bytes: ' + str(self._next_payload_bytes))
# reset buffer and switch state to receiving payload bytes
self._rbuffer.seek(0)
self._rbuffer.truncate()
@@ -447,12 +548,14 @@ class BrokerConnection(object):
staged_bytes = self._rbuffer.tell()
try:
bytes_to_read = self._next_payload_bytes - staged_bytes
log.debug('bytes_to_read: ' + str(bytes_to_read))
data = self._sock.recv(bytes_to_read)
# We expect socket.recv to raise an exception if there is not
# enough data to read the full bytes_to_read
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
#raise Exception('Whyyyy?')
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
return None

View File

@@ -58,6 +58,14 @@ class CommitFailedError(KafkaError):
pass
class AuthenticationMethodNotSupported(KafkaError):
pass
class AuthenticationFailedError(KafkaError):
retriable = False
class BrokerResponseError(KafkaError):
errno = None
message = None
@@ -328,6 +336,18 @@ class InvalidTimestampError(BrokerResponseError):
description = ('The timestamp of the message is out of acceptable range.')
class UnsupportedSaslMechanismError(BrokerResponseError):
errno = 33
message = 'UNSUPPORTED_SASL_MECHANISM'
description = ('The broker does not support the requested SASL mechanism.')
class IllegalSaslStateError(BrokerResponseError):
errno = 34
message = 'ILLEGAL_SASL_STATE'
description = ('Request is not valid given the current SASL state.')
class KafkaUnavailableError(KafkaError):
pass

View File

@@ -276,6 +276,9 @@ class KafkaProducer(object):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'sasl_mechanism': None,
'sasl_username': None,
'sasl_password': None,
}
def __init__(self, **configs):
@@ -284,11 +287,11 @@ class KafkaProducer(object):
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
# Only check for extra config keys in top-level class
assert not configs, 'Unrecognized configs: %s' % configs
if self.config['client_id'] is None:
if self.config['client_id'] is None:K
self.config['client_id'] = 'kafka-python-producer-%s' % \
PRODUCER_CLIENT_ID_SEQUENCE.increment()

View File

@@ -78,3 +78,24 @@ class DescribeGroupsRequest_v0(Struct):
DescribeGroupsRequest = [DescribeGroupsRequest_v0]
DescribeGroupsResponse = [DescribeGroupsResponse_v0]
class SaslHandShakeResponse_v0(Struct):
API_KEY = 17
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('enabled_mechanisms', Array(String('utf-8')))
)
class SaslHandShakeRequest_v0(Struct):
API_KEY = 17
API_VERSION = 0
RESPONSE_TYPE = SaslHandShakeResponse_v0
SCHEMA = Schema(
('mechanism', String('utf-8'))
)
SaslHandShakeRequest = [SaslHandShakeRequest_v0]
SaslHandShakeResponse = [SaslHandShakeResponse_v0]