Merge pull request #315 from datastax/315

PYTHON-315 - Handle server warnings in protocol v4+
This commit is contained in:
Adam Holmberg
2015-05-18 16:56:59 -05:00
2 changed files with 35 additions and 0 deletions

View File

@@ -2730,6 +2730,7 @@ class ResponseFuture(object):
_metrics = None
_paging_state = None
_custom_payload = None
_warnings = None
def __init__(self, session, message, query, default_timeout=None, metrics=None, prepared_statement=None):
self.session = session
@@ -2817,6 +2818,24 @@ class ResponseFuture(object):
"""
return self._paging_state is not None
@property
def warnings(self):
"""
Warnings returned from the server, if any. This will only be
set for protocol_version 4+.
Warnings may be returned for such things as oversized batches,
or too many tombstones in slice queries.
Ensure the future is complete before trying to access this property
(call :meth:`.result()`, or after callback is invoked).
Otherwise it may throw if the response has not been received.
"""
# TODO: When timers are introduced, just make this wait
if not self._event.is_set():
raise Exception("warnings cannot be retrieved before ResponseFuture is finalized")
return self._warnings
@property
def custom_payload(self):
"""
@@ -2830,6 +2849,7 @@ class ResponseFuture(object):
:return: :ref:`custom_payload`.
"""
# TODO: When timers are introduced, just make this wait
if not self._event.is_set():
raise Exception("custom_payload cannot be retrieved before ResponseFuture is finalized")
return self._custom_payload
@@ -2872,6 +2892,7 @@ class ResponseFuture(object):
self.query.trace_id = trace_id
self._query_trace = QueryTrace(trace_id, self.session)
self._warnings = getattr(response, 'warnings', None)
self._custom_payload = getattr(response, 'custom_payload', None)
if isinstance(response, ResultMessage):

View File

@@ -57,6 +57,7 @@ HEADER_DIRECTION_MASK = 0x80
COMPRESSED_FLAG = 0x01
TRACING_FLAG = 0x02
CUSTOM_PAYLOAD_FLAG = 0x04
WARNING_FLAG = 0x08
_message_types_by_name = {}
_message_types_by_opcode = {}
@@ -74,6 +75,7 @@ class _MessageType(object):
tracing = False
custom_payload = None
warnings = None
def to_binary(self, stream_id, protocol_version, compression=None):
flags = 0
@@ -133,6 +135,12 @@ def decode_response(protocol_version, user_type_map, stream_id, flags, opcode, b
else:
trace_id = None
if flags & WARNING_FLAG:
warnings = read_stringlist(body)
flags ^= WARNING_FLAG
else:
warnings = None
if flags & CUSTOM_PAYLOAD_FLAG:
custom_payload = read_bytesmap(body)
flags ^= CUSTOM_PAYLOAD_FLAG
@@ -147,6 +155,12 @@ def decode_response(protocol_version, user_type_map, stream_id, flags, opcode, b
msg.stream_id = stream_id
msg.trace_id = trace_id
msg.custom_payload = custom_payload
msg.warnings = warnings
if msg.warnings:
for w in msg.warnings:
log.warning("Server warning: %s", w)
return msg