Support v5 error code map for failure messages

Done for PYTHON-619
This commit is contained in:
Tyler Hobbs
2016-08-05 18:09:40 -05:00
parent 47ec211116
commit 58cba38e1f
2 changed files with 90 additions and 30 deletions

View File

@@ -325,16 +325,34 @@ class CoordinationFailure(RequestExecutionException):
The number of replicas that sent a failure message
"""
def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None, failures=None):
error_code_map = None
"""
A map of inet addresses to error codes representing replicas that sent
a failure message. Only set when `protocol_version` is 5 or higher.
"""
def __init__(self, summary_message, consistency=None, required_responses=None,
received_responses=None, failures=None, error_code_map=None):
self.consistency = consistency
self.required_responses = required_responses
self.received_responses = received_responses
self.failures = failures
Exception.__init__(self, summary_message + ' info=' +
repr({'consistency': consistency_value_to_name(consistency),
'required_responses': required_responses,
'received_responses': received_responses,
'failures': failures}))
self.error_code_map = error_code_map
info_dict = {
'consistency': consistency_value_to_name(consistency),
'required_responses': required_responses,
'received_responses': received_responses,
'failures': failures
}
if error_code_map is not None:
# make error codes look like "0x002a"
formatted_map = dict((addr, '0x%04x' % err_code)
for (addr, err_code) in error_code_map.items())
info_dict['error_code_map'] = formatted_map
Exception.__init__(self, summary_message + ' info=' + repr(info_dict))
class ReadFailure(CoordinationFailure):

View File

@@ -127,11 +127,11 @@ class ErrorMessage(_MessageType, Exception):
self.info = info
@classmethod
def recv_body(cls, f, *args):
def recv_body(cls, f, protocol_version, *args):
code = read_int(f)
msg = read_string(f)
subcls = error_classes.get(code, cls)
extra_info = subcls.recv_error_info(f)
extra_info = subcls.recv_error_info(f, protocol_version)
return subcls(code=code, message=msg, info=extra_info)
def summary_msg(self):
@@ -146,7 +146,7 @@ class ErrorMessage(_MessageType, Exception):
__repr__ = __str__
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
pass
def to_exception(self):
@@ -192,7 +192,7 @@ class UnavailableErrorMessage(RequestExecutionException):
error_code = 0x1000
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
return {
'consistency': read_consistency_level(f),
'required_replicas': read_int(f),
@@ -223,7 +223,7 @@ class WriteTimeoutErrorMessage(RequestExecutionException):
error_code = 0x1100
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
return {
'consistency': read_consistency_level(f),
'received_responses': read_int(f),
@@ -240,7 +240,7 @@ class ReadTimeoutErrorMessage(RequestExecutionException):
error_code = 0x1200
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
return {
'consistency': read_consistency_level(f),
'received_responses': read_int(f),
@@ -257,13 +257,27 @@ class ReadFailureMessage(RequestExecutionException):
error_code = 0x1300
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
consistency = read_consistency_level(f)
received_responses = read_int(f)
required_responses = read_int(f)
if protocol_version >= 5:
error_code_map = read_error_code_map(f)
failures = len(error_code_map)
else:
error_code_map = None
failures = read_int(f)
data_retrieved = bool(read_byte(f))
return {
'consistency': read_consistency_level(f),
'received_responses': read_int(f),
'required_responses': read_int(f),
'failures': read_int(f),
'data_retrieved': bool(read_byte(f)),
'consistency': consistency,
'received_responses': received_responses,
'required_responses': required_responses,
'failures': failures,
'error_code_map': error_code_map,
'data_retrieved': data_retrieved
}
def to_exception(self):
@@ -275,7 +289,7 @@ class FunctionFailureMessage(RequestExecutionException):
error_code = 0x1400
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
return {
'keyspace': read_string(f),
'function': read_string(f),
@@ -291,13 +305,27 @@ class WriteFailureMessage(RequestExecutionException):
error_code = 0x1500
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
consistency = read_consistency_level(f)
received_responses = read_int(f)
required_responses = read_int(f)
if protocol_version >= 5:
error_code_map = read_error_code_map(f)
failures = len(error_code_map)
else:
error_code_map = None
failures = read_int(f)
write_type = WriteType.name_to_value[read_string(f)]
return {
'consistency': read_consistency_level(f),
'received_responses': read_int(f),
'required_responses': read_int(f),
'failures': read_int(f),
'write_type': WriteType.name_to_value[read_string(f)],
'consistency': consistency,
'received_responses': received_responses,
'required_responses': required_responses,
'failures': failures,
'error_code_map': error_code_map,
'write_type': write_type
}
def to_exception(self):
@@ -335,7 +363,7 @@ class PreparedQueryNotFound(RequestValidationException):
error_code = 0x2500
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
# return the query ID
return read_binary_string(f)
@@ -345,7 +373,7 @@ class AlreadyExistsException(ConfigurationException):
error_code = 0x2400
@staticmethod
def recv_error_info(f):
def recv_error_info(f, protocol_version):
return {
'keyspace': read_string(f),
'table': read_string(f),
@@ -1224,6 +1252,15 @@ def write_stringmultimap(f, strmmap):
write_stringlist(f, v)
def read_error_code_map(f):
numpairs = read_int(f)
error_code_map = {}
for _ in range(numpairs):
endpoint = read_inet_addr_only(f)
error_code_map[endpoint] = read_short(f)
return error_code_map
def read_value(f):
size = read_int(f)
if size < 0:
@@ -1241,17 +1278,22 @@ def write_value(f, v):
f.write(v)
def read_inet(f):
def read_inet_addr_only(f):
size = read_byte(f)
addrbytes = f.read(size)
port = read_int(f)
if size == 4:
addrfam = socket.AF_INET
elif size == 16:
addrfam = socket.AF_INET6
else:
raise InternalError("bad inet address: %r" % (addrbytes,))
return (util.inet_ntop(addrfam, addrbytes), port)
return util.inet_ntop(addrfam, addrbytes)
def read_inet(f):
addr = read_inet_addr_only(f)
port = read_int(f)
return (addr, port)
def write_inet(f, addrtuple):