Support v2 protocol authentication

Fixes #125, PYTHON-73
This commit is contained in:
Tyler Hobbs
2014-05-23 14:27:39 -05:00
parent 4520c64a29
commit 9947a67e87
7 changed files with 248 additions and 14 deletions

View File

@@ -5,6 +5,7 @@ In Progress
Features
--------
* Make libev C extension Python3-compatible (PYTHON-70)
* Support v2 protocol authentication (PYTHON-73, github #125)
Bug Fixes
---------

125
cassandra/auth.py Normal file
View File

@@ -0,0 +1,125 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
class AuthProvider(object):
"""
An abstract class that defines the interface that will be used for
creating :class:`~.Authenticator` instances when opening new
connections to Cassandra.
.. versionadded:: 2.0.0
"""
def new_authenticator(self, host):
"""
Implementations of this class should return a new instance
of :class:`~.Authenticator` or one of its subclasses.
"""
raise NotImplementedError()
class Authenticator(object):
"""
An abstract class that handles SASL authentication with Cassandra servers.
Each time a new connection is created and the server requires authentication,
a new instance of this class will be created by the corresponding
:class:`~.AuthProvider` to handler that authentication. The lifecycle of the
new :class:`~.Authenticator` will the be:
1) The :meth:`~.initial_response()` method will be called. The return
value will be sent to the server to initiate the handshake.
2) The server will respond to each client response by either issuing a
challenge or indicating that the authentication is complete (successful or not).
If a new challenge is issued, :meth:`~.evaluate_challenge()`
will be called to produce a response that will be sent to the
server. This challenge/response negotiation will continue until the server
responds that authentication is successful (or an :exc:`~.AuthenticationFailed`
is raised).
3) When the server indicates that authentication is successful,
:meth:`~.on_authentication_success` will be called a token string that
that the server may optionally have sent.
The exact nature of the negotiation between the client and server is specific
to the authentication mechanism configured server-side.
.. versionadded:: 2.0.0
"""
def initial_response(self):
"""
Returns an message to send to the server to initiate the SASL handshake.
:const:`None` may be returned to send an empty message.
"""
return None
def evaluate_challenge(self, challenge):
"""
Called when the server sends a challenge message. Generally, this method
should return :const:`None` when authentication is complete from a
client perspective. Otherwise, a string should be returned.
"""
raise NotImplementedError()
def on_authentication_success(self, token):
"""
Called when the server indicates that authentication was successful.
Depending on the authentication mechanism, `token` may be :const:`None`
or a string.
"""
pass
class PlainTextAuthProvider(AuthProvider):
"""
An :class:`~.AuthProvider` that works with Cassandra's PasswordAuthenticator.
Example usage::
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
auth_provider = PlainTextAuthProvider(
username='cassandra', password='cassandra')
cluster = Cluster(auth_provider=auth_provider)
.. versionadded:: 2.0.0
"""
def __init__(self, username, password):
self.username = username
self.password = password
def new_authenticator(self, host):
return PlainTextAuthenticator(self.username, self.password)
class PlainTextAuthenticator(Authenticator):
"""
An :class:`~.Authenticator` that works with Cassandra's PasswordAuthenticator.
.. versionadded:: 2.0.0
"""
def __init__(self, username, password):
self.username = username
self.password = password
def initial_response(self):
return "\x00%s\x00%s" % (self.username, self.password)
def evaluate_challenge(self, challenge):
return None

View File

@@ -200,8 +200,15 @@ class Cluster(object):
auth_provider = None
"""
An optional function that accepts one argument, the IP address of a node,
When :attr:`~.Cluster.protocol_version` is 2 or higher, this should
be an instance of a subclass of :class:`~cassandra.auth.AuthProvider`,
such ass :class:`~.PlainTextAuthProvider`.
When :attr:`~.Cluster.protocol_version` is 1, this should be
a function that accepts one argument, the IP address of a node,
and returns a dict of credentials for that node.
When not using authentication, this should be left as :const:`None`.
"""
load_balancing_policy = None
@@ -334,9 +341,13 @@ class Cluster(object):
self.compression = compression
if auth_provider is not None:
if not callable(auth_provider):
raise ValueError("auth_provider must be callable")
self.auth_provider = auth_provider
if not hasattr(auth_provider, 'new_authenticator'):
if protocol_version > 1:
raise TypeError("auth_provider must implement the cassandra.auth.AuthProvider "
"interface when protocol_version >= 2")
self.auth_provider = auth_provider
else:
self.auth_provider = auth_provider.new_authenticator
if load_balancing_policy is not None:
if isinstance(load_balancing_policy, type):
@@ -482,7 +493,7 @@ class Cluster(object):
def _make_connection_kwargs(self, address, kwargs_dict):
if self.auth_provider:
kwargs_dict['credentials'] = self.auth_provider(address)
kwargs_dict['authenticator'] = self.auth_provider(address)
kwargs_dict['port'] = self.port
kwargs_dict['compression'] = self.compression

View File

@@ -33,7 +33,9 @@ from cassandra.marshal import int32_pack, header_unpack
from cassandra.protocol import (ReadyMessage, AuthenticateMessage, OptionsMessage,
StartupMessage, ErrorMessage, CredentialsMessage,
QueryMessage, ResultMessage, decode_response,
InvalidRequestException, SupportedMessage)
InvalidRequestException, SupportedMessage,
AuthResponseMessage, AuthChallengeMessage,
AuthSuccessMessage)
from cassandra.util import OrderedDict
@@ -157,12 +159,12 @@ class Connection(object):
is_control_connection = False
def __init__(self, host='127.0.0.1', port=9042, credentials=None,
def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
ssl_options=None, sockopts=None, compression=True,
cql_version=None, protocol_version=2, is_control_connection=False):
self.host = host
self.port = port
self.credentials = credentials
self.authenticator = authenticator
self.ssl_options = ssl_options
self.sockopts = sockopts
self.compression = compression
@@ -425,15 +427,24 @@ class Connection(object):
self.compressor = self._compressor
self.connected_event.set()
elif isinstance(startup_response, AuthenticateMessage):
log.debug("Got AuthenticateMessage on new connection (%s) from %s", id(self), self.host)
log.debug("Got AuthenticateMessage on new connection (%s) from %s: %s",
id(self), self.host, startup_response.authenticator)
if self.credentials is None:
if self.authenticator is None:
raise AuthenticationFailed('Remote end requires authentication.')
self.authenticator = startup_response.authenticator
cm = CredentialsMessage(creds=self.credentials)
callback = partial(self._handle_startup_response, did_authenticate=True)
self.send_msg(cm, cb=callback)
self.authenticator_class = startup_response.authenticator
if isinstance(self.authenticator, dict):
log.debug("Sending credentials-based auth response on %s", self)
cm = CredentialsMessage(creds=self.authenticator)
callback = partial(self._handle_startup_response, did_authenticate=True)
self.send_msg(cm, cb=callback)
else:
log.debug("Sending SASL-based auth response on %s", self)
initial_response = self.authenticator.initial_response()
initial_response = "" if initial_response is None else initial_response.encode('utf-8')
self.send_msg(AuthResponseMessage(initial_response), self._handle_auth_response)
elif isinstance(startup_response, ErrorMessage):
log.debug("Received ErrorMessage on new connection (%s) from %s: %s",
id(self), self.host, startup_response.summary_msg())
@@ -453,6 +464,35 @@ class Connection(object):
log.error(msg, startup_response)
raise ProtocolError(msg % (startup_response,))
@defunct_on_error
def _handle_auth_response(self, auth_response):
if self.is_defunct:
return
if isinstance(auth_response, AuthSuccessMessage):
log.debug("Connection %s successfully authenticated", self)
self.authenticator.on_authentication_success(auth_response.token)
if self._compressor:
self.compressor = self._compressor
self.connected_event.set()
elif isinstance(auth_response, AuthChallengeMessage):
response = self.authenticator.evaluate_challenge(auth_response.challenge)
msg = AuthResponseMessage("" if response is None else response)
self.send_msg(msg, self._handle_auth_response)
elif isinstance(auth_response, ErrorMessage):
log.debug("Received ErrorMessage on new connection (%s) from %s: %s",
id(self), self.host, auth_response.summary_msg())
raise AuthenticationFailed(
"Failed to authenticate to %s: %s" %
(self.host, auth_response.summary_msg()))
elif isinstance(auth_response, ConnectionShutdown):
log.debug("Connection to %s was closed during the authentication process", self.host)
raise auth_response
else:
msg = "Unexpected response during Connection authentication to %s: %r"
log.error(msg, self.host, auth_response)
raise ProtocolError(msg % (self.host, auth_response))
def set_keyspace_blocking(self, keyspace):
if not keyspace or keyspace == self.keyspace:
return

View File

@@ -193,6 +193,11 @@ class ProtocolException(ErrorMessageSub):
error_code = 0x000A
class BadCredentials(ErrorMessageSub):
summary = 'Bad credentials'
error_code = 0x0100
class UnavailableErrorMessage(RequestExecutionException):
summary = 'Unavailable exception'
error_code = 0x1000
@@ -369,6 +374,41 @@ class CredentialsMessage(_MessageType):
write_string(f, credval)
class AuthChallengeMessage(_MessageType):
opcode = 0x0E
name = 'AUTH_CHALLENGE'
def __init__(self, challenge):
self.challenge = challenge
@classmethod
def recv_body(cls, f):
return cls(read_longstring(f))
class AuthResponseMessage(_MessageType):
opcode = 0x0F
name = 'AUTH_RESPONSE'
def __init__(self, response):
self.response = response
def send_body(self, f, protocol_version):
write_longstring(f, self.response)
class AuthSuccessMessage(_MessageType):
opcode = 0x10
name = 'AUTH_SUCCESS'
def __init__(self, token):
self.token = token
@classmethod
def recv_body(cls, f):
return cls(read_longstring(f))
class OptionsMessage(_MessageType):
opcode = 0x05
name = 'OPTIONS'

View File

@@ -0,0 +1,16 @@
``cassandra.auth`` - Authentication
===================================
.. module:: cassandra.auth
.. autoclass:: AuthProvider
:members:
.. autoclass:: Authenticator
:members:
.. autoclass:: PlainTextAuthProvider
:members:
.. autoclass:: PlainTextAuthenticator
:members:

View File

@@ -7,6 +7,7 @@ API Documentation
cassandra
cassandra/cluster
cassandra/policies
cassandra/auth
cassandra/metadata
cassandra/query
cassandra/pool