diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 20680121..f72139f3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,7 @@ In Progress Features -------- * Make libev C extension Python3-compatible (PYTHON-70) +* Support v2 protocol authentication (PYTHON-73, github #125) Bug Fixes --------- diff --git a/cassandra/auth.py b/cassandra/auth.py new file mode 100644 index 00000000..477d1b4d --- /dev/null +++ b/cassandra/auth.py @@ -0,0 +1,125 @@ +# Copyright 2013-2014 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + + +class AuthProvider(object): + """ + An abstract class that defines the interface that will be used for + creating :class:`~.Authenticator` instances when opening new + connections to Cassandra. + + .. versionadded:: 2.0.0 + """ + + def new_authenticator(self, host): + """ + Implementations of this class should return a new instance + of :class:`~.Authenticator` or one of its subclasses. + """ + raise NotImplementedError() + + +class Authenticator(object): + """ + An abstract class that handles SASL authentication with Cassandra servers. + + Each time a new connection is created and the server requires authentication, + a new instance of this class will be created by the corresponding + :class:`~.AuthProvider` to handler that authentication. The lifecycle of the + new :class:`~.Authenticator` will the be: + + 1) The :meth:`~.initial_response()` method will be called. The return + value will be sent to the server to initiate the handshake. + + 2) The server will respond to each client response by either issuing a + challenge or indicating that the authentication is complete (successful or not). + If a new challenge is issued, :meth:`~.evaluate_challenge()` + will be called to produce a response that will be sent to the + server. This challenge/response negotiation will continue until the server + responds that authentication is successful (or an :exc:`~.AuthenticationFailed` + is raised). + + 3) When the server indicates that authentication is successful, + :meth:`~.on_authentication_success` will be called a token string that + that the server may optionally have sent. + + The exact nature of the negotiation between the client and server is specific + to the authentication mechanism configured server-side. + + .. versionadded:: 2.0.0 + """ + + def initial_response(self): + """ + Returns an message to send to the server to initiate the SASL handshake. + :const:`None` may be returned to send an empty message. + """ + return None + + def evaluate_challenge(self, challenge): + """ + Called when the server sends a challenge message. Generally, this method + should return :const:`None` when authentication is complete from a + client perspective. Otherwise, a string should be returned. + """ + raise NotImplementedError() + + def on_authentication_success(self, token): + """ + Called when the server indicates that authentication was successful. + Depending on the authentication mechanism, `token` may be :const:`None` + or a string. + """ + pass + + +class PlainTextAuthProvider(AuthProvider): + """ + An :class:`~.AuthProvider` that works with Cassandra's PasswordAuthenticator. + + Example usage:: + + from cassandra.cluster import Cluster + from cassandra.auth import PlainTextAuthProvider + + auth_provider = PlainTextAuthProvider( + username='cassandra', password='cassandra') + cluster = Cluster(auth_provider=auth_provider) + + .. versionadded:: 2.0.0 + """ + + def __init__(self, username, password): + self.username = username + self.password = password + + def new_authenticator(self, host): + return PlainTextAuthenticator(self.username, self.password) + + +class PlainTextAuthenticator(Authenticator): + """ + An :class:`~.Authenticator` that works with Cassandra's PasswordAuthenticator. + + .. versionadded:: 2.0.0 + """ + + def __init__(self, username, password): + self.username = username + self.password = password + + def initial_response(self): + return "\x00%s\x00%s" % (self.username, self.password) + + def evaluate_challenge(self, challenge): + return None diff --git a/cassandra/cluster.py b/cassandra/cluster.py index a9b3417d..3f60687a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -200,8 +200,15 @@ class Cluster(object): auth_provider = None """ - An optional function that accepts one argument, the IP address of a node, + When :attr:`~.Cluster.protocol_version` is 2 or higher, this should + be an instance of a subclass of :class:`~cassandra.auth.AuthProvider`, + such ass :class:`~.PlainTextAuthProvider`. + + When :attr:`~.Cluster.protocol_version` is 1, this should be + a function that accepts one argument, the IP address of a node, and returns a dict of credentials for that node. + + When not using authentication, this should be left as :const:`None`. """ load_balancing_policy = None @@ -334,9 +341,13 @@ class Cluster(object): self.compression = compression if auth_provider is not None: - if not callable(auth_provider): - raise ValueError("auth_provider must be callable") - self.auth_provider = auth_provider + if not hasattr(auth_provider, 'new_authenticator'): + if protocol_version > 1: + raise TypeError("auth_provider must implement the cassandra.auth.AuthProvider " + "interface when protocol_version >= 2") + self.auth_provider = auth_provider + else: + self.auth_provider = auth_provider.new_authenticator if load_balancing_policy is not None: if isinstance(load_balancing_policy, type): @@ -482,7 +493,7 @@ class Cluster(object): def _make_connection_kwargs(self, address, kwargs_dict): if self.auth_provider: - kwargs_dict['credentials'] = self.auth_provider(address) + kwargs_dict['authenticator'] = self.auth_provider(address) kwargs_dict['port'] = self.port kwargs_dict['compression'] = self.compression diff --git a/cassandra/connection.py b/cassandra/connection.py index a7604341..0660382c 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -33,7 +33,9 @@ from cassandra.marshal import int32_pack, header_unpack from cassandra.protocol import (ReadyMessage, AuthenticateMessage, OptionsMessage, StartupMessage, ErrorMessage, CredentialsMessage, QueryMessage, ResultMessage, decode_response, - InvalidRequestException, SupportedMessage) + InvalidRequestException, SupportedMessage, + AuthResponseMessage, AuthChallengeMessage, + AuthSuccessMessage) from cassandra.util import OrderedDict @@ -157,12 +159,12 @@ class Connection(object): is_control_connection = False - def __init__(self, host='127.0.0.1', port=9042, credentials=None, + def __init__(self, host='127.0.0.1', port=9042, authenticator=None, ssl_options=None, sockopts=None, compression=True, cql_version=None, protocol_version=2, is_control_connection=False): self.host = host self.port = port - self.credentials = credentials + self.authenticator = authenticator self.ssl_options = ssl_options self.sockopts = sockopts self.compression = compression @@ -425,15 +427,24 @@ class Connection(object): self.compressor = self._compressor self.connected_event.set() elif isinstance(startup_response, AuthenticateMessage): - log.debug("Got AuthenticateMessage on new connection (%s) from %s", id(self), self.host) + log.debug("Got AuthenticateMessage on new connection (%s) from %s: %s", + id(self), self.host, startup_response.authenticator) - if self.credentials is None: + if self.authenticator is None: raise AuthenticationFailed('Remote end requires authentication.') - self.authenticator = startup_response.authenticator - cm = CredentialsMessage(creds=self.credentials) - callback = partial(self._handle_startup_response, did_authenticate=True) - self.send_msg(cm, cb=callback) + self.authenticator_class = startup_response.authenticator + + if isinstance(self.authenticator, dict): + log.debug("Sending credentials-based auth response on %s", self) + cm = CredentialsMessage(creds=self.authenticator) + callback = partial(self._handle_startup_response, did_authenticate=True) + self.send_msg(cm, cb=callback) + else: + log.debug("Sending SASL-based auth response on %s", self) + initial_response = self.authenticator.initial_response() + initial_response = "" if initial_response is None else initial_response.encode('utf-8') + self.send_msg(AuthResponseMessage(initial_response), self._handle_auth_response) elif isinstance(startup_response, ErrorMessage): log.debug("Received ErrorMessage on new connection (%s) from %s: %s", id(self), self.host, startup_response.summary_msg()) @@ -453,6 +464,35 @@ class Connection(object): log.error(msg, startup_response) raise ProtocolError(msg % (startup_response,)) + @defunct_on_error + def _handle_auth_response(self, auth_response): + if self.is_defunct: + return + + if isinstance(auth_response, AuthSuccessMessage): + log.debug("Connection %s successfully authenticated", self) + self.authenticator.on_authentication_success(auth_response.token) + if self._compressor: + self.compressor = self._compressor + self.connected_event.set() + elif isinstance(auth_response, AuthChallengeMessage): + response = self.authenticator.evaluate_challenge(auth_response.challenge) + msg = AuthResponseMessage("" if response is None else response) + self.send_msg(msg, self._handle_auth_response) + elif isinstance(auth_response, ErrorMessage): + log.debug("Received ErrorMessage on new connection (%s) from %s: %s", + id(self), self.host, auth_response.summary_msg()) + raise AuthenticationFailed( + "Failed to authenticate to %s: %s" % + (self.host, auth_response.summary_msg())) + elif isinstance(auth_response, ConnectionShutdown): + log.debug("Connection to %s was closed during the authentication process", self.host) + raise auth_response + else: + msg = "Unexpected response during Connection authentication to %s: %r" + log.error(msg, self.host, auth_response) + raise ProtocolError(msg % (self.host, auth_response)) + def set_keyspace_blocking(self, keyspace): if not keyspace or keyspace == self.keyspace: return diff --git a/cassandra/protocol.py b/cassandra/protocol.py index e1c7ba88..e9cb59f4 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -193,6 +193,11 @@ class ProtocolException(ErrorMessageSub): error_code = 0x000A +class BadCredentials(ErrorMessageSub): + summary = 'Bad credentials' + error_code = 0x0100 + + class UnavailableErrorMessage(RequestExecutionException): summary = 'Unavailable exception' error_code = 0x1000 @@ -369,6 +374,41 @@ class CredentialsMessage(_MessageType): write_string(f, credval) +class AuthChallengeMessage(_MessageType): + opcode = 0x0E + name = 'AUTH_CHALLENGE' + + def __init__(self, challenge): + self.challenge = challenge + + @classmethod + def recv_body(cls, f): + return cls(read_longstring(f)) + + +class AuthResponseMessage(_MessageType): + opcode = 0x0F + name = 'AUTH_RESPONSE' + + def __init__(self, response): + self.response = response + + def send_body(self, f, protocol_version): + write_longstring(f, self.response) + + +class AuthSuccessMessage(_MessageType): + opcode = 0x10 + name = 'AUTH_SUCCESS' + + def __init__(self, token): + self.token = token + + @classmethod + def recv_body(cls, f): + return cls(read_longstring(f)) + + class OptionsMessage(_MessageType): opcode = 0x05 name = 'OPTIONS' diff --git a/docs/api/cassandra/auth.rst b/docs/api/cassandra/auth.rst new file mode 100644 index 00000000..0ee6e539 --- /dev/null +++ b/docs/api/cassandra/auth.rst @@ -0,0 +1,16 @@ +``cassandra.auth`` - Authentication +=================================== + +.. module:: cassandra.auth + +.. autoclass:: AuthProvider + :members: + +.. autoclass:: Authenticator + :members: + +.. autoclass:: PlainTextAuthProvider + :members: + +.. autoclass:: PlainTextAuthenticator + :members: diff --git a/docs/api/index.rst b/docs/api/index.rst index d7695233..d8106d10 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -7,6 +7,7 @@ API Documentation cassandra cassandra/cluster cassandra/policies + cassandra/auth cassandra/metadata cassandra/query cassandra/pool