diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 7b434a96..afb49be8 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2730,6 +2730,7 @@ class ResponseFuture(object): _metrics = None _paging_state = None _custom_payload = None + _warnings = None def __init__(self, session, message, query, default_timeout=None, metrics=None, prepared_statement=None): self.session = session @@ -2817,6 +2818,24 @@ class ResponseFuture(object): """ return self._paging_state is not None + @property + def warnings(self): + """ + Warnings returned from the server, if any. This will only be + set for protocol_version 4+. + + Warnings may be returned for such things as oversized batches, + or too many tombstones in slice queries. + + Ensure the future is complete before trying to access this property + (call :meth:`.result()`, or after callback is invoked). + Otherwise it may throw if the response has not been received. + """ + # TODO: When timers are introduced, just make this wait + if not self._event.is_set(): + raise Exception("warnings cannot be retrieved before ResponseFuture is finalized") + return self._warnings + @property def custom_payload(self): """ @@ -2830,6 +2849,7 @@ class ResponseFuture(object): :return: :ref:`custom_payload`. """ + # TODO: When timers are introduced, just make this wait if not self._event.is_set(): raise Exception("custom_payload cannot be retrieved before ResponseFuture is finalized") return self._custom_payload @@ -2872,6 +2892,7 @@ class ResponseFuture(object): self.query.trace_id = trace_id self._query_trace = QueryTrace(trace_id, self.session) + self._warnings = getattr(response, 'warnings', None) self._custom_payload = getattr(response, 'custom_payload', None) if isinstance(response, ResultMessage): diff --git a/cassandra/protocol.py b/cassandra/protocol.py index d88db589..f7c7176f 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -57,6 +57,7 @@ HEADER_DIRECTION_MASK = 0x80 COMPRESSED_FLAG = 0x01 TRACING_FLAG = 0x02 CUSTOM_PAYLOAD_FLAG = 0x04 +WARNING_FLAG = 0x08 _message_types_by_name = {} _message_types_by_opcode = {} @@ -74,6 +75,7 @@ class _MessageType(object): tracing = False custom_payload = None + warnings = None def to_binary(self, stream_id, protocol_version, compression=None): flags = 0 @@ -133,6 +135,12 @@ def decode_response(protocol_version, user_type_map, stream_id, flags, opcode, b else: trace_id = None + if flags & WARNING_FLAG: + warnings = read_stringlist(body) + flags ^= WARNING_FLAG + else: + warnings = None + if flags & CUSTOM_PAYLOAD_FLAG: custom_payload = read_bytesmap(body) flags ^= CUSTOM_PAYLOAD_FLAG @@ -147,6 +155,12 @@ def decode_response(protocol_version, user_type_map, stream_id, flags, opcode, b msg.stream_id = stream_id msg.trace_id = trace_id msg.custom_payload = custom_payload + msg.warnings = warnings + + if msg.warnings: + for w in msg.warnings: + log.warning("Server warning: %s", w) + return msg