Merge pull request #646 from datastax/284

PYTHON-284 - request size attribute, request init listener
This commit is contained in:
Greg Bestland
2016-08-15 12:20:15 -05:00
committed by GitHub
5 changed files with 166 additions and 4 deletions

View File

@@ -1916,6 +1916,7 @@ class Session(object):
_pools = None
_profile_manager = None
_metrics = None
_request_init_callbacks = None
def __init__(self, cluster, hosts, keyspace=None):
self.cluster = cluster
@@ -1926,6 +1927,7 @@ class Session(object):
self._pools = {}
self._profile_manager = cluster.profile_manager
self._metrics = cluster.metrics
self._request_init_callbacks = []
self._protocol_version = self.cluster.protocol_version
self.encoder = Encoder()
@@ -2018,6 +2020,7 @@ class Session(object):
"""
future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile)
future._protocol_handler = self.client_protocol_handler
self._on_request(future)
future.send_request()
return future
@@ -2123,6 +2126,38 @@ class Session(object):
setattr(clone, attr, value)
return clone
def add_request_init_listener(self, fn, *args, **kwargs):
"""
Adds a callback with arguments to be called when any request is created.
It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created,
and before the request is sent\*. This can be used to create extensions by adding result callbacks to the
response future.
\* where `response_future` is the :class:`.ResponseFuture` for the request.
Note that the init callback is done on the client thread creating the request, so you may need to consider
synchronization if you have multiple threads. Any callbacks added to the response future will be executed
on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in
:meth:`.ResponseFuture.add_callbacks`.
See `this example <https://github.com/datastax/python-driver/blob/master/examples/request_init_listener.py>`_ in the
source tree for an example.
"""
self._request_init_callbacks.append((fn, args, kwargs))
def remove_request_init_listener(self, fn, *args, **kwargs):
"""
Removes a callback and arguments from the list.
See :meth:`.Session.add_request_init_listener`.
"""
self._request_init_callbacks.remove((fn, args, kwargs))
def _on_request(self, response_future):
for fn, args, kwargs in self._request_init_callbacks:
fn(response_future, *args, **kwargs)
def prepare(self, query, custom_payload=None):
"""
Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
@@ -3162,6 +3197,11 @@ class ResponseFuture(object):
Always ``True`` for non-DDL requests.
"""
request_encoded_size = None
"""
Size of the request message sent
"""
session = None
row_factory = None
message = None
@@ -3285,8 +3325,10 @@ class ResponseFuture(object):
connection, request_id = pool.borrow_connection(timeout=2.0)
self._connection = connection
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
connection.send_msg(message, request_id, cb=cb, encoder=self._protocol_handler.encode_message, decoder=self._protocol_handler.decode_message,
result_metadata=result_meta)
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
encoder=self._protocol_handler.encode_message,
decoder=self._protocol_handler.decode_message,
result_metadata=result_meta)
return request_id
except NoConnectionsAvailable as exc:
log.debug("All connections for host %s are at capacity, moving to the next host", host)

View File

@@ -458,8 +458,9 @@ class Connection(object):
# queue the decoder function with the request
# this allows us to inject custom functions per request to encode, decode messages
self._requests[request_id] = (cb, decoder, result_metadata)
self.push(encoder(msg, request_id, self.protocol_version, compressor=self.compressor, allow_beta_protocol_version=self.allow_beta_protocol_version))
return request_id
msg = encoder(msg, request_id, self.protocol_version, compressor=self.compressor, allow_beta_protocol_version=self.allow_beta_protocol_version)
self.push(msg)
return len(msg)
def wait_for_response(self, msg, timeout=None):
return self.wait_for_responses(msg, timeout=timeout)[0]

View File

@@ -150,6 +150,10 @@
.. automethod:: execution_profile_clone_update
.. automethod:: add_request_init_listener
.. automethod:: remove_request_init_listener
.. autoclass:: ResponseFuture ()
.. autoattribute:: query

8
examples/README.rst Normal file
View File

@@ -0,0 +1,8 @@
Driver Examples
===============
This directory will contain a set of scripts demonstrating driver APIs or integration techniques. It will not be exhaustive, but will contain examples where they are too involved, or
open-ended to include inline in the docstrings. In that case, they should be referenced from the docstrings
Features
--------
* `request_init_listener.py <request_init_listener.py>`_ A script demonstrating how to register a session request listener and use it to track alternative metrics about requests (size, for example).

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python
# Copyright 2013-2016 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script shows an example "request init listener" which can be registered to track certain request metrics
# for a session. In this case we're just accumulating total request and error counts, as well as some statistics
# about the encoded request size. Note that the counts would be available using the internal 'metrics' tracking --
# this is just demonstrating a way to track a few custom attributes.
from __future__ import print_function
from cassandra.cluster import Cluster
from greplin import scales
import pprint
pp = pprint.PrettyPrinter(indent=2)
class RequestAnalyzer(object):
"""
Class used to track request and error counts for a Session.
Also computes statistics on encoded request size.
"""
requests = scales.PmfStat('request size')
errors = scales.IntStat('errors')
def __init__(self, session):
scales.init(self, '/cassandra')
# each instance will be registered with a session, and receive a callback for each request generated
session.add_request_init_listener(self.on_request)
def on_request(self, rf):
# This callback is invoked each time a request is created, on the thread creating the request.
# We can use this to count events, or add callbacks
rf.add_callbacks(self.on_success, self.on_error, callback_args=(rf,), errback_args=(rf,))
def on_success(self, _, response_future):
# future callback on a successful request; just record the size
self.requests.addValue(response_future.request_encoded_size)
def on_error(self, _, response_future):
# future callback for failed; record size and increment errors
self.requests.addValue(response_future.request_encoded_size)
self.errors += 1
def __str__(self):
# just extracting request count from the size stats (which are recorded on all requests)
request_sizes = dict(self.requests)
count = request_sizes.pop('count')
return "%d requests (%d errors)\nRequest size statistics:\n%s" % (count, self.errors, pp.pformat(request_sizes))
# connect a session
session = Cluster().connect()
# attach a listener to this session
ra = RequestAnalyzer(session)
session.execute("SELECT release_version FROM system.local")
session.execute("SELECT release_version FROM system.local")
print(ra)
# 2 requests (0 errors)
# Request size statistics:
# { '75percentile': 74,
# '95percentile': 74,
# '98percentile': 74,
# '999percentile': 74,
# '99percentile': 74,
# 'max': 74,
# 'mean': 74.0,
# 'median': 74.0,
# 'min': 74,
# 'stddev': 0.0}
try:
# intentional error to show that count increase
session.execute("syntax err")
except Exception as e:
pass
print()
print(ra) # note: the counts are updated, but the stats are not because scales only updates every 20s
# 3 requests (1 errors)
# Request size statistics:
# { '75percentile': 74,
# '95percentile': 74,
# '98percentile': 74,
# '999percentile': 74,
# '99percentile': 74,
# 'max': 74,
# 'mean': 74.0,
# 'median': 74.0,
# 'min': 74,
# 'stddev': 0.0}