From 58cba38e1f918ea5771d62a5e2934fea13d9b7c5 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 5 Aug 2016 18:09:40 -0500 Subject: [PATCH] Support v5 error code map for failure messages Done for PYTHON-619 --- cassandra/__init__.py | 30 ++++++++++++--- cassandra/protocol.py | 90 +++++++++++++++++++++++++++++++------------ 2 files changed, 90 insertions(+), 30 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 01589103..2fc22a04 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -325,16 +325,34 @@ class CoordinationFailure(RequestExecutionException): The number of replicas that sent a failure message """ - def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None, failures=None): + error_code_map = None + """ + A map of inet addresses to error codes representing replicas that sent + a failure message. Only set when `protocol_version` is 5 or higher. + """ + + def __init__(self, summary_message, consistency=None, required_responses=None, + received_responses=None, failures=None, error_code_map=None): self.consistency = consistency self.required_responses = required_responses self.received_responses = received_responses self.failures = failures - Exception.__init__(self, summary_message + ' info=' + - repr({'consistency': consistency_value_to_name(consistency), - 'required_responses': required_responses, - 'received_responses': received_responses, - 'failures': failures})) + self.error_code_map = error_code_map + + info_dict = { + 'consistency': consistency_value_to_name(consistency), + 'required_responses': required_responses, + 'received_responses': received_responses, + 'failures': failures + } + + if error_code_map is not None: + # make error codes look like "0x002a" + formatted_map = dict((addr, '0x%04x' % err_code) + for (addr, err_code) in error_code_map.items()) + info_dict['error_code_map'] = formatted_map + + Exception.__init__(self, summary_message + ' info=' + repr(info_dict)) class ReadFailure(CoordinationFailure): diff --git a/cassandra/protocol.py b/cassandra/protocol.py index e63966fe..53c1d694 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -127,11 +127,11 @@ class ErrorMessage(_MessageType, Exception): self.info = info @classmethod - def recv_body(cls, f, *args): + def recv_body(cls, f, protocol_version, *args): code = read_int(f) msg = read_string(f) subcls = error_classes.get(code, cls) - extra_info = subcls.recv_error_info(f) + extra_info = subcls.recv_error_info(f, protocol_version) return subcls(code=code, message=msg, info=extra_info) def summary_msg(self): @@ -146,7 +146,7 @@ class ErrorMessage(_MessageType, Exception): __repr__ = __str__ @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): pass def to_exception(self): @@ -192,7 +192,7 @@ class UnavailableErrorMessage(RequestExecutionException): error_code = 0x1000 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): return { 'consistency': read_consistency_level(f), 'required_replicas': read_int(f), @@ -223,7 +223,7 @@ class WriteTimeoutErrorMessage(RequestExecutionException): error_code = 0x1100 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): return { 'consistency': read_consistency_level(f), 'received_responses': read_int(f), @@ -240,7 +240,7 @@ class ReadTimeoutErrorMessage(RequestExecutionException): error_code = 0x1200 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): return { 'consistency': read_consistency_level(f), 'received_responses': read_int(f), @@ -257,13 +257,27 @@ class ReadFailureMessage(RequestExecutionException): error_code = 0x1300 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): + consistency = read_consistency_level(f) + received_responses = read_int(f) + required_responses = read_int(f) + + if protocol_version >= 5: + error_code_map = read_error_code_map(f) + failures = len(error_code_map) + else: + error_code_map = None + failures = read_int(f) + + data_retrieved = bool(read_byte(f)) + return { - 'consistency': read_consistency_level(f), - 'received_responses': read_int(f), - 'required_responses': read_int(f), - 'failures': read_int(f), - 'data_retrieved': bool(read_byte(f)), + 'consistency': consistency, + 'received_responses': received_responses, + 'required_responses': required_responses, + 'failures': failures, + 'error_code_map': error_code_map, + 'data_retrieved': data_retrieved } def to_exception(self): @@ -275,7 +289,7 @@ class FunctionFailureMessage(RequestExecutionException): error_code = 0x1400 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): return { 'keyspace': read_string(f), 'function': read_string(f), @@ -291,13 +305,27 @@ class WriteFailureMessage(RequestExecutionException): error_code = 0x1500 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): + consistency = read_consistency_level(f) + received_responses = read_int(f) + required_responses = read_int(f) + + if protocol_version >= 5: + error_code_map = read_error_code_map(f) + failures = len(error_code_map) + else: + error_code_map = None + failures = read_int(f) + + write_type = WriteType.name_to_value[read_string(f)] + return { - 'consistency': read_consistency_level(f), - 'received_responses': read_int(f), - 'required_responses': read_int(f), - 'failures': read_int(f), - 'write_type': WriteType.name_to_value[read_string(f)], + 'consistency': consistency, + 'received_responses': received_responses, + 'required_responses': required_responses, + 'failures': failures, + 'error_code_map': error_code_map, + 'write_type': write_type } def to_exception(self): @@ -335,7 +363,7 @@ class PreparedQueryNotFound(RequestValidationException): error_code = 0x2500 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): # return the query ID return read_binary_string(f) @@ -345,7 +373,7 @@ class AlreadyExistsException(ConfigurationException): error_code = 0x2400 @staticmethod - def recv_error_info(f): + def recv_error_info(f, protocol_version): return { 'keyspace': read_string(f), 'table': read_string(f), @@ -1224,6 +1252,15 @@ def write_stringmultimap(f, strmmap): write_stringlist(f, v) +def read_error_code_map(f): + numpairs = read_int(f) + error_code_map = {} + for _ in range(numpairs): + endpoint = read_inet_addr_only(f) + error_code_map[endpoint] = read_short(f) + return error_code_map + + def read_value(f): size = read_int(f) if size < 0: @@ -1241,17 +1278,22 @@ def write_value(f, v): f.write(v) -def read_inet(f): +def read_inet_addr_only(f): size = read_byte(f) addrbytes = f.read(size) - port = read_int(f) if size == 4: addrfam = socket.AF_INET elif size == 16: addrfam = socket.AF_INET6 else: raise InternalError("bad inet address: %r" % (addrbytes,)) - return (util.inet_ntop(addrfam, addrbytes), port) + return util.inet_ntop(addrfam, addrbytes) + + +def read_inet(f): + addr = read_inet_addr_only(f) + port = read_int(f) + return (addr, port) def write_inet(f, addrtuple):