diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 3daf87df..49679f39 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1916,6 +1916,7 @@ class Session(object): _pools = None _profile_manager = None _metrics = None + _request_init_callbacks = None def __init__(self, cluster, hosts, keyspace=None): self.cluster = cluster @@ -1926,6 +1927,7 @@ class Session(object): self._pools = {} self._profile_manager = cluster.profile_manager self._metrics = cluster.metrics + self._request_init_callbacks = [] self._protocol_version = self.cluster.protocol_version self.encoder = Encoder() @@ -2018,6 +2020,7 @@ class Session(object): """ future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile) future._protocol_handler = self.client_protocol_handler + self._on_request(future) future.send_request() return future @@ -2123,6 +2126,35 @@ class Session(object): setattr(clone, attr, value) return clone + def add_request_init_listener(self, fn, *args, **kwargs): + """ + Adds a callback with arguments to be called when any request is created. + + It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created, + and before the request is sent\*. This can be used to create extensions by adding result callbacks to the + response future. + + \* where `response_future` is the :class:`.ResponseFuture` for the request. + + Note that the init callback is done on the client thread creating the request, so you may need to consider + synchronization if you have multiple threads. Any callbacks added to the response future will be executed + on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in + :meth:`.ResponseFuture.add_callbacks`. + """ + self._request_init_callbacks.append((fn, args, kwargs)) + + def remove_request_init_listener(self, fn, *args, **kwargs): + """ + Removes a callback and arguments from the list. + + See :meth:`.Session.add_request_init_listener`. + """ + self._request_init_callbacks.remove((fn, args, kwargs)) + + def _on_request(self, response_future): + for fn, args, kwargs in self._request_init_callbacks: + fn(response_future, *args, **kwargs) + def prepare(self, query, custom_payload=None): """ Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement` diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index b8435ee2..05d66278 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -150,6 +150,10 @@ .. automethod:: execution_profile_clone_update + .. automethod:: add_request_init_listener + + .. automethod:: remove_request_init_listener + .. autoclass:: ResponseFuture () .. autoattribute:: query