From 151b7642419bcb5c9e0fc77f2402819ea2d8c8ce Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 9 Aug 2016 11:55:02 -0500 Subject: [PATCH 1/3] add encoded message size to cluster.ResponseFuture PYTHON-284 --- cassandra/cluster.py | 11 +++++++++-- cassandra/connection.py | 5 +++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index f705d5ed..3daf87df 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3162,6 +3162,11 @@ class ResponseFuture(object): Always ``True`` for non-DDL requests. """ + request_encoded_size = None + """ + Size of the request message sent + """ + session = None row_factory = None message = None @@ -3285,8 +3290,10 @@ class ResponseFuture(object): connection, request_id = pool.borrow_connection(timeout=2.0) self._connection = connection result_meta = self.prepared_statement.result_metadata if self.prepared_statement else [] - connection.send_msg(message, request_id, cb=cb, encoder=self._protocol_handler.encode_message, decoder=self._protocol_handler.decode_message, - result_metadata=result_meta) + self.request_encoded_size = connection.send_msg(message, request_id, cb=cb, + encoder=self._protocol_handler.encode_message, + decoder=self._protocol_handler.decode_message, + result_metadata=result_meta) return request_id except NoConnectionsAvailable as exc: log.debug("All connections for host %s are at capacity, moving to the next host", host) diff --git a/cassandra/connection.py b/cassandra/connection.py index 79910825..19b7964b 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -458,8 +458,9 @@ class Connection(object): # queue the decoder function with the request # this allows us to inject custom functions per request to encode, decode messages self._requests[request_id] = (cb, decoder, result_metadata) - self.push(encoder(msg, request_id, self.protocol_version, compressor=self.compressor, allow_beta_protocol_version=self.allow_beta_protocol_version)) - return request_id + msg = encoder(msg, request_id, self.protocol_version, compressor=self.compressor, allow_beta_protocol_version=self.allow_beta_protocol_version) + self.push(msg) + return len(msg) def wait_for_response(self, msg, timeout=None): return self.wait_for_responses(msg, timeout=timeout)[0] From 686ccf114fd7b50d2ba2685bdcea3a59292ed6ec Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 11 Aug 2016 08:36:02 -0500 Subject: [PATCH 2/3] basic request init listener PYTHON-284 --- cassandra/cluster.py | 32 ++++++++++++++++++++++++++++++++ docs/api/cassandra/cluster.rst | 4 ++++ 2 files changed, 36 insertions(+) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 3daf87df..49679f39 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1916,6 +1916,7 @@ class Session(object): _pools = None _profile_manager = None _metrics = None + _request_init_callbacks = None def __init__(self, cluster, hosts, keyspace=None): self.cluster = cluster @@ -1926,6 +1927,7 @@ class Session(object): self._pools = {} self._profile_manager = cluster.profile_manager self._metrics = cluster.metrics + self._request_init_callbacks = [] self._protocol_version = self.cluster.protocol_version self.encoder = Encoder() @@ -2018,6 +2020,7 @@ class Session(object): """ future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile) future._protocol_handler = self.client_protocol_handler + self._on_request(future) future.send_request() return future @@ -2123,6 +2126,35 @@ class Session(object): setattr(clone, attr, value) return clone + def add_request_init_listener(self, fn, *args, **kwargs): + """ + Adds a callback with arguments to be called when any request is created. + + It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created, + and before the request is sent\*. This can be used to create extensions by adding result callbacks to the + response future. + + \* where `response_future` is the :class:`.ResponseFuture` for the request. + + Note that the init callback is done on the client thread creating the request, so you may need to consider + synchronization if you have multiple threads. Any callbacks added to the response future will be executed + on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in + :meth:`.ResponseFuture.add_callbacks`. + """ + self._request_init_callbacks.append((fn, args, kwargs)) + + def remove_request_init_listener(self, fn, *args, **kwargs): + """ + Removes a callback and arguments from the list. + + See :meth:`.Session.add_request_init_listener`. + """ + self._request_init_callbacks.remove((fn, args, kwargs)) + + def _on_request(self, response_future): + for fn, args, kwargs in self._request_init_callbacks: + fn(response_future, *args, **kwargs) + def prepare(self, query, custom_payload=None): """ Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement` diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index b8435ee2..05d66278 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -150,6 +150,10 @@ .. automethod:: execution_profile_clone_update + .. automethod:: add_request_init_listener + + .. automethod:: remove_request_init_listener + .. autoclass:: ResponseFuture () .. autoattribute:: query From 5207686badcba16c07799aeb63edc6c02a2190a2 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 11 Aug 2016 16:48:07 -0500 Subject: [PATCH 3/3] Add example request init listener PYTHON-284 --- cassandra/cluster.py | 3 + examples/README.rst | 8 +++ examples/request_init_listener.py | 107 ++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 examples/README.rst create mode 100644 examples/request_init_listener.py diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 49679f39..89cac44a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2140,6 +2140,9 @@ class Session(object): synchronization if you have multiple threads. Any callbacks added to the response future will be executed on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in :meth:`.ResponseFuture.add_callbacks`. + + See `this example `_ in the + source tree for an example. """ self._request_init_callbacks.append((fn, args, kwargs)) diff --git a/examples/README.rst b/examples/README.rst new file mode 100644 index 00000000..889f9111 --- /dev/null +++ b/examples/README.rst @@ -0,0 +1,8 @@ +Driver Examples +=============== +This directory will contain a set of scripts demonstrating driver APIs or integration techniques. It will not be exhaustive, but will contain examples where they are too involved, or +open-ended to include inline in the docstrings. In that case, they should be referenced from the docstrings + +Features +-------- +* `request_init_listener.py `_ A script demonstrating how to register a session request listener and use it to track alternative metrics about requests (size, for example). diff --git a/examples/request_init_listener.py b/examples/request_init_listener.py new file mode 100644 index 00000000..7d662b15 --- /dev/null +++ b/examples/request_init_listener.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python +# Copyright 2013-2016 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script shows an example "request init listener" which can be registered to track certain request metrics +# for a session. In this case we're just accumulating total request and error counts, as well as some statistics +# about the encoded request size. Note that the counts would be available using the internal 'metrics' tracking -- +# this is just demonstrating a way to track a few custom attributes. + +from __future__ import print_function +from cassandra.cluster import Cluster +from greplin import scales + +import pprint +pp = pprint.PrettyPrinter(indent=2) + + +class RequestAnalyzer(object): + """ + Class used to track request and error counts for a Session. + + Also computes statistics on encoded request size. + """ + + requests = scales.PmfStat('request size') + errors = scales.IntStat('errors') + + def __init__(self, session): + scales.init(self, '/cassandra') + # each instance will be registered with a session, and receive a callback for each request generated + session.add_request_init_listener(self.on_request) + + def on_request(self, rf): + # This callback is invoked each time a request is created, on the thread creating the request. + # We can use this to count events, or add callbacks + rf.add_callbacks(self.on_success, self.on_error, callback_args=(rf,), errback_args=(rf,)) + + def on_success(self, _, response_future): + # future callback on a successful request; just record the size + self.requests.addValue(response_future.request_encoded_size) + + def on_error(self, _, response_future): + # future callback for failed; record size and increment errors + self.requests.addValue(response_future.request_encoded_size) + self.errors += 1 + + def __str__(self): + # just extracting request count from the size stats (which are recorded on all requests) + request_sizes = dict(self.requests) + count = request_sizes.pop('count') + return "%d requests (%d errors)\nRequest size statistics:\n%s" % (count, self.errors, pp.pformat(request_sizes)) + + +# connect a session +session = Cluster().connect() + +# attach a listener to this session +ra = RequestAnalyzer(session) + +session.execute("SELECT release_version FROM system.local") +session.execute("SELECT release_version FROM system.local") + +print(ra) +# 2 requests (0 errors) +# Request size statistics: +# { '75percentile': 74, +# '95percentile': 74, +# '98percentile': 74, +# '999percentile': 74, +# '99percentile': 74, +# 'max': 74, +# 'mean': 74.0, +# 'median': 74.0, +# 'min': 74, +# 'stddev': 0.0} + +try: + # intentional error to show that count increase + session.execute("syntax err") +except Exception as e: + pass + +print() +print(ra) # note: the counts are updated, but the stats are not because scales only updates every 20s +# 3 requests (1 errors) +# Request size statistics: +# { '75percentile': 74, +# '95percentile': 74, +# '98percentile': 74, +# '999percentile': 74, +# '99percentile': 74, +# 'max': 74, +# 'mean': 74.0, +# 'median': 74.0, +# 'min': 74, +# 'stddev': 0.0}