rename sasl_username and sasl_password to sasl_plain_*
This commit is contained in:
committed by
Dana Powers
parent
354e9a9c31
commit
40afc98dca
@@ -71,8 +71,8 @@ class KafkaClient(object):
|
||||
'metrics': None,
|
||||
'metric_group_prefix': '',
|
||||
'sasl_mechanism': None,
|
||||
'sasl_username': None,
|
||||
'sasl_password': None,
|
||||
'sasl_plain_username': None,
|
||||
'sasl_plain_password': None,
|
||||
}
|
||||
API_VERSIONS = [
|
||||
(0, 10),
|
||||
@@ -153,6 +153,13 @@ class KafkaClient(object):
|
||||
metrics (kafka.metrics.Metrics): Optionally provide a metrics
|
||||
instance for capturing network IO stats. Default: None.
|
||||
metric_group_prefix (str): Prefix for metric names. Default: ''
|
||||
sasl_mechanim (str): string picking sasl mechanism when security_protocol
|
||||
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
|
||||
Default: None
|
||||
sasl_plain_username (str): username for sasl PLAIN authentication.
|
||||
Default: None
|
||||
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
|
||||
Defualt: None
|
||||
"""
|
||||
self.config = copy.copy(self.DEFAULT_CONFIG)
|
||||
for key in self.config:
|
||||
|
||||
@@ -75,8 +75,8 @@ class BrokerConnection(object):
|
||||
'api_version': (0, 8, 2), # default to most restrictive
|
||||
'state_change_callback': lambda conn: True,
|
||||
'sasl_mechanism': None,
|
||||
'sasl_username': None,
|
||||
'sasl_password': None
|
||||
'sasl_plain_username': None,
|
||||
'sasl_plain_password': None
|
||||
}
|
||||
|
||||
def __init__(self, host, port, afi, **configs):
|
||||
@@ -225,7 +225,7 @@ class BrokerConnection(object):
|
||||
|
||||
if self.state is ConnectionStates.AUTHENTICATING:
|
||||
if self._try_authenticate():
|
||||
log.debug('%s: Authenticated as %s', str(self), self.config['sasl_username'])
|
||||
log.debug('%s: Authenticated as %s', str(self), self.config['sasl_plain_username'])
|
||||
self.state = ConnectionStates.CONNECTED
|
||||
self.config['state_change_callback'](self)
|
||||
|
||||
@@ -341,9 +341,9 @@ class BrokerConnection(object):
|
||||
try:
|
||||
self._sock.setblocking(True)
|
||||
# Send our credentials
|
||||
msg = bytes('\0'.join([self.config['sasl_username'],
|
||||
self.config['sasl_username'],
|
||||
self.config['sasl_password']]).encode('utf-8'))
|
||||
msg = bytes('\0'.join([self.config['sasl_plain_username'],
|
||||
self.config['sasl_plain_username'],
|
||||
self.config['sasl_plain_password']]).encode('utf-8'))
|
||||
size = Int32.encode(len(msg))
|
||||
self._sock.sendall(size + msg)
|
||||
|
||||
@@ -354,9 +354,9 @@ class BrokerConnection(object):
|
||||
data = data + self._sock.recv(4 - received_bytes)
|
||||
received_bytes = received_bytes + len(data)
|
||||
if not data:
|
||||
log.error('%s: Authentication failed for user %s', self, self.config['sasl_username'])
|
||||
log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
|
||||
self.close(error=Errors.ConnectionError('Authentication failed'))
|
||||
raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_username']))
|
||||
raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_plain_username']))
|
||||
self._sock.setblocking(False)
|
||||
except (AssertionError, ConnectionError) as e:
|
||||
log.exception("%s: Error receiving reply from server", self)
|
||||
|
||||
@@ -189,9 +189,9 @@ class KafkaConsumer(six.Iterator):
|
||||
sasl_mechanim (str): string picking sasl mechanism when security_protocol
|
||||
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
|
||||
Default: None
|
||||
sasl_username (str): username for sasl PLAIN authentication.
|
||||
sasl_plain_username (str): username for sasl PLAIN authentication.
|
||||
Default: None
|
||||
sasl_password (str): passowrd for sasl PLAIN authentication.
|
||||
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
|
||||
Defualt: None
|
||||
|
||||
Note:
|
||||
@@ -242,8 +242,8 @@ class KafkaConsumer(six.Iterator):
|
||||
'selector': selectors.DefaultSelector,
|
||||
'exclude_internal_topics': True,
|
||||
'sasl_mechanism': None,
|
||||
'sasl_username': None,
|
||||
'sasl_password': None,
|
||||
'sasl_plain_username': None,
|
||||
'sasl_plain_password': None,
|
||||
}
|
||||
|
||||
def __init__(self, *topics, **configs):
|
||||
|
||||
@@ -239,9 +239,9 @@ class KafkaProducer(object):
|
||||
sasl_mechanim (str): string picking sasl mechanism when security_protocol
|
||||
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
|
||||
Default: None
|
||||
sasl_username (str): username for sasl PLAIN authentication.
|
||||
sasl_plain_username (str): username for sasl PLAIN authentication.
|
||||
Default: None
|
||||
sasl_password (str): passowrd for sasl PLAIN authentication.
|
||||
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
|
||||
Defualt: None
|
||||
|
||||
Note:
|
||||
@@ -285,8 +285,8 @@ class KafkaProducer(object):
|
||||
'metrics_sample_window_ms': 30000,
|
||||
'selector': selectors.DefaultSelector,
|
||||
'sasl_mechanism': None,
|
||||
'sasl_username': None,
|
||||
'sasl_password': None,
|
||||
'sasl_plain_username': None,
|
||||
'sasl_plain_password': None,
|
||||
}
|
||||
|
||||
def __init__(self, **configs):
|
||||
|
||||
Reference in New Issue
Block a user