Api documentation for cassandra, cassandra.cluster

This commit is contained in:
Tyler Hobbs
2013-07-01 16:13:46 -05:00
parent 8884e3a7b0
commit 143e2def4b
10 changed files with 301 additions and 119 deletions

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ MANIFEST
dist
.coverage
cover/
docs/_build/

View File

@@ -3,15 +3,51 @@ __version__ = '.'.join(map(str, __version_info__))
class ConsistencyLevel(object):
"""
Spcifies how many replicas must respond for an operation to be considered
a success. By default, ``ONE`` is used for all operations.
"""
ANY = 0
"""
Only requires that one replica receives the write *or* the coordinator
stores a hint to replay later. Valid only for writes.
"""
ONE = 1
"""
Only one replica needs to respond to consider the operation a success
"""
TWO = 2
"""
Two replicas must respond to consider the operation a success
"""
THREE = 3
"""
Three replicas must respond to consider the operation a success
"""
QUORUM = 4
"""
``ceil(RF/2)`` replicas must respond to consider the operation a success
"""
ALL = 5
"""
All replicas must respond to consider the operation a success
"""
LOCAL_QUORUM = 6
"""
Requires a quorum of replicas in the local datacenter
"""
EACH_QUORUM = 7
"""
Requires a quorum of replicas in each datacenter
"""
ConsistencyLevel.value_to_name = {
ConsistencyLevel.ANY: 'ANY',
@@ -37,10 +73,20 @@ ConsistencyLevel.name_to_value = {
class Unavailable(Exception):
"""
There were not enough live replicas to satisfy the requested consistency
level, so the coordinator node immediately failed the request without
forwarding it to any replicas.
"""
consistency = None
""" The requested :class:`ConsistencyLevel` """
required_replicas = None
""" The number of replicas that needed to be live to complete the operation """
alive_replicas = None
""" The number of replicas that were actually alive """
def __init__(self, message, consistency=None, required_replicas=None, alive_replicas=None):
Exception.__init__(self, message)
@@ -50,10 +96,21 @@ class Unavailable(Exception):
class Timeout(Exception):
"""
Replicas failed to respond to the coordinator node before timing out.
"""
consistency = None
""" The requested :class:`ConsistencyLevel` """
required_responses = None
""" The number of required replica responses """
received_responses = None
"""
The number of replicas that responded before the coordinator timed out
the operation
"""
def __init__(self, message, consistency=None, required_responses=None, received_responses=None):
Exception.__init__(self, message)
@@ -63,8 +120,16 @@ class Timeout(Exception):
class ReadTimeout(Timeout):
"""
A subclass of :exc:`Timeout` for read operations.
"""
data_retrieved = None
"""
A boolean indicating whether the requested data was retrieved
by the coordinator from any replicas before it timed out the
operation
"""
def __init__(self, message, data_retrieved=None, **kwargs):
Timeout.__init__(self, message, **kwargs)
@@ -72,8 +137,14 @@ class ReadTimeout(Timeout):
class WriteTimeout(Timeout):
"""
A subclass of :exc:`Timeout` for write operations.
"""
write_type = None
"""
The type of write operation, enum on :class:`~cassandra.policies.WriteType`
"""
def __init__(self, message, write_type=None, **kwargs):
Timeout.__init__(self, message, **kwargs)

View File

@@ -63,6 +63,8 @@ class NoHostAvailable(Exception):
class Cluster(object):
"""
The main class to use when interacting with a Cassandra cluster.
Typically, one instance of this class will be created for each
separate Cassandra cluster that your application interacts with.
Example usage::
@@ -95,29 +97,29 @@ class Cluster(object):
load_balancing_policy_factory = RoundRobinPolicy
"""
A factory function which creates instances of subclasses of
:cls:`policies.LoadBalancingPolicy`. Defaults to
:cls:`policies.RoundRobinPolicy`.
:class:`policies.LoadBalancingPolicy`. Defaults to
:class:`policies.RoundRobinPolicy`.
"""
reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0)
"""
An instance of :cls:`policies.ReconnectionPolicy`. Defaults to an instance
of :cls:`ExponentialReconnectionPolicy` with a base delay of one second and
An instance of :class:`policies.ReconnectionPolicy`. Defaults to an instance
of :class:`ExponentialReconnectionPolicy` with a base delay of one second and
a max delay of ten minutes.
"""
retry_policy_factory = RetryPolicy
"""
A factory function which creates instances of
:cls:`policies.RetryPolicy`. Defaults to
:cls:`policies.RetryPolicy`.
:class:`policies.RetryPolicy`. Defaults to
:class:`policies.RetryPolicy`.
"""
conviction_policy_factory = SimpleConvictionPolicy
"""
A factory function which creates instances of
:cls:`policies.ConvictionPolicy`. Defaults to
:cls:`policies.SimpleConvictionPolicy`.
:class:`policies.ConvictionPolicy`. Defaults to
:class:`policies.SimpleConvictionPolicy`.
"""
metrics_enabled = False
@@ -127,7 +129,7 @@ class Cluster(object):
sockopts = None
"""
An optional list of tuples which will be used as *args to
An optional list of tuples which will be used as arguments to
``socket.setsockopt()`` for all created sockets.
"""
@@ -139,7 +141,7 @@ class Cluster(object):
metadata = None
"""
An instance of :cls:`cassandra.metadata.Metadata`.
An instance of :class:`cassandra.metadata.Metadata`.
"""
connection_class = AsyncoreConnection
@@ -151,7 +153,7 @@ class Cluster(object):
By default, ``AsyncoreConnection`` will be used, which uses
the ``asyncore`` module in the Python standard library. The
performance slightly worse than with ``pyev``, but it is
performance is slightly worse than with ``pyev``, but it is
supported on a wider range of systems.
"""
@@ -176,6 +178,10 @@ class Cluster(object):
sockopts=None,
executor_threads=2,
max_schema_agreement_wait=10):
"""
Any of the mutable Cluster attributes may be set as keyword arguments
to the constructor.
"""
self.contact_points = contact_points
self.port = port
@@ -275,7 +281,11 @@ class Cluster(object):
def set_max_connections_per_host(self, host_distance, max_connections):
self._max_connections_per_host[host_distance] = max_connections
def _connection_factory(self, address, *args, **kwargs):
def connection_factory(self, address, *args, **kwargs):
"""
Called to create a new connection with proper configuration.
Intended for internal use only.
"""
if self.auth_provider:
kwargs['credentials'] = self.auth_provider(address)
@@ -295,7 +305,7 @@ class Cluster(object):
def connect(self, keyspace=None):
"""
Creates and returns a new :cls:`~.Session` object. If `keyspace`
Creates and returns a new :class:`~.Session` object. If `keyspace`
is specified, that keyspace will be the default keyspace for
operations on the ``Session``.
"""
@@ -361,19 +371,25 @@ class Cluster(object):
return session
def on_up(self, host):
""" Internal method """
"""
Called when a host is marked up by its :class:`~.HealthMonitor`.
Intended for internal use only.
"""
reconnector = host.get_and_set_reconnection_handler(None)
if reconnector:
reconnector.cancel()
self.prepare_all_queries(host)
self._prepare_all_queries(host)
self.control_connection.on_up(host)
for session in self.sessions:
session.on_up(host)
def on_down(self, host):
""" Internal method """
"""
Called when a host is marked down by its :class:`~.HealthMonitor`.
Intended for internal use only.
"""
self.control_connection.on_down(host)
for session in self.sessions:
session.on_down(host)
@@ -395,32 +411,32 @@ class Cluster(object):
reconnector.start()
def on_add(self, host):
""" Internal method """
self.prepare_all_queries(host)
self.control_connection.on_add(host)
for session in self.sessions: # TODO need to copy/lock?
session.on_add(host)
def on_remove(self, host):
""" Internal method """
self.control_connection.on_remove(host)
for session in self.sessions:
session.on_remove(host)
def add_host(self, address, signal):
""" Internal method """
"""
Called when adding initial contact points and when the control
connection subsequently discovers a new node. Intended for internal
use only.
"""
log.info("Now considering host %s for new connections", address)
new_host = self.metadata.add_host(address)
if new_host and signal:
self.on_add(new_host)
self._prepare_all_queries(new_host)
self.control_connection.on_add(new_host)
for session in self.sessions: # TODO need to copy/lock?
session.on_add(new_host)
return new_host
def remove_host(self, host):
""" Internal method """
"""
Called when the control connection observes that a node has left the
ring. Intended for internal use only.
"""
log.info("Host %s will no longer be considered for new connections", host)
if host and self.metadata.remove_host(host):
self.on_remove(host)
self.control_connection.on_remove(host)
for session in self.sessions:
session.on_remove(host)
def ensure_core_connections(self):
"""
@@ -440,13 +456,13 @@ class Cluster(object):
return self.executor.submit(
self.control_connection.refresh_schema, keyspace, table)
def prepare_all_queries(self, host):
def _prepare_all_queries(self, host):
if not self._prepared_statements:
return
log.debug("Preparing all known prepared statements against host %s" % (host,))
try:
connection = self._connection_factory(host.address)
connection = self.connection_factory(host.address)
try:
self.control_connection.wait_for_schema_agreement(connection)
except:
@@ -477,7 +493,6 @@ class Cluster(object):
log.exception("Error trying to prepare all statements on host %s" % (host,))
def prepare_on_all_sessions(self, md5_id, prepared_statement, excluded_host):
""" Internal """
self._prepared_statements[md5_id] = prepared_statement
for session in self.sessions:
session.prepare_on_all_hosts(prepared_statement.query_string, excluded_host)
@@ -487,7 +502,7 @@ class Session(object):
"""
A collection of connection pools for each host in the cluster.
Instances of this class should not be created directly, only
using :meth:`~.Cluster.connect()`.
using :meth:`.Cluster.connect()`.
Queries and statements can be executed through ``Session`` instances
using the :meth:`~.Session.execute()` and :meth:`~.Session.execute_async()`
@@ -542,18 +557,18 @@ class Session(object):
If an error is encountered while executing the query, an Exception
will be raised.
`query` may be a query string or an instance of :cls:`cassandra.query.Query`.
`query` may be a query string or an instance of :class:`cassandra.query.Query`.
`parameters` may be a sequence or dict of parameters to bind. If a
sequence is used, '%s' should be used the placeholder for each
argument. If a dict is used, '%(name)s' style placeholders must
sequence is used, ``%s`` should be used the placeholder for each
argument. If a dict is used, ``%(name)s`` style placeholders must
be used.
"""
return self.execute_async(query, parameters).result()
def execute_async(self, query, parameters=None):
"""
Execute the given query and return a :cls:`~.ResponseFuture` object
Execute the given query and return a :class:`~.ResponseFuture` object
which callbacks may be attached to for asynchronous response
delivery. You may also call :meth:`~.ResponseFuture.result()`
on the ``ResponseFuture`` to syncronously block for results at
@@ -563,26 +578,25 @@ class Session(object):
>>> session = cluster.connect()
>>> future = session.execute_async("SELECT * FROM mycf")
>>>
>>> def print_results(results):
>>> def log_results(results):
... for row in results:
... print row
>>>
>>> def handle_error(exc):
>>> print exc
>>>
>>> future.add_callbacks(print_results, handle_error)
... log.info("Results: %s", row)
>>> def log_error(exc):
>>> log.error("Operation failed: %s", exc)
>>> future.add_callbacks(log_results, log_error)
Async execution with blocking wait for results::
>>> future = session.execute_async("SELECT * FROM mycf")
>>>
>>> # do other stuff...
>>>
>>> try:
... results = future.result()
... except:
... print "operation failed!"
... log.exception("Operation failed:")
"""
if isinstance(query, basestring):
@@ -609,6 +623,16 @@ class Session(object):
return future
def prepare(self, query):
"""
Prepares a query string, returing a :class:`~cassandra.query.PreparedStatement`
instance which can be used as follows::
>>> session = cluster.connect("mykeyspace")
>>> query = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"
>>> prepared = session.prepare(query)
>>> session.execute(prepared.bind((user.id, user.name, user.age)))
"""
message = PrepareMessage(query=query)
future = ResponseFuture(self, message, query=None)
try:
@@ -630,7 +654,10 @@ class Session(object):
return prepared_statement
def prepare_on_all_hosts(self, query, excluded_host):
""" Internal """
"""
Prepare the given query on all hosts, excluding ``excluded_host``.
Intended for internal use only.
"""
for host, pool in self._pools.items():
if host != excluded_host:
future = ResponseFuture(self, PrepareMessage(query=query), None)
@@ -693,14 +720,20 @@ class Session(object):
return previous
def on_up(self, host):
""" Internal """
"""
Called by the parent Cluster instance when a host's :class:`HealthMonitor`
marks it up. Only intended for internal use.
"""
previous_pool = self.add_host(host)
self._load_balancer.on_up(host)
if previous_pool:
previous_pool.shutdown()
def on_down(self, host):
""" Internal """
"""
Called by the parent Cluster instance when a host's :class:`HealthMonitor`
marks it down. Only intended for internal use.
"""
self._load_balancer.on_down(host)
pool = self._pools.pop(host, None)
if pool:
@@ -849,7 +882,7 @@ class ControlConnection(object):
node/token and schema metadata.
"""
log.debug("[control connection] Opening new connection to %s" % (host,))
connection = self._cluster._connection_factory(host.address)
connection = self._cluster.connection_factory(host.address)
log.debug("[control connection] Established new connection, registering "
"watchers and refreshing schema and topology")
@@ -1212,37 +1245,13 @@ _NO_RESULT_YET = object()
class ResponseFuture(object):
"""
An asynchronous response delivery mechanism that is returned from calls
to :meth:`~.Session.execute_async()`.
to :meth:`.Session.execute_async()`.
There are two ways for results to be delivered:
- Asynchronously, by attaching callback and errback functions
- Synchronously, by calling :meth:`~.ResponseFuture.result()`
Callback and errback example::
>>> session = cluster.connect()
>>> future = session.execute_async("SELECT * FROM mycf")
>>>
>>> def print_results(results):
... for row in results:
... print row
>>>
>>> def handle_error(exc):
>>> print exc
>>>
>>> future.add_callbacks(print_results, handle_error)
Example of using ``result()`` to synchronously wait for results::
>>> future = session.execute_async("SELECT * FROM mycf")
>>>
>>> # do other stuff...
>>>
>>> try:
... results = future.result()
... except:
... print "operation failed!"
- Synchronously, by calling :meth:`.result()`
- Asynchronously, by attaching callback and errback functions via
:meth:`.add_callback()`, :meth:`.add_errback()`, and
:meth:`.add_callbacks()`.
"""
session = None
row_factory = None
@@ -1488,6 +1497,19 @@ class ResponseFuture(object):
Return the final result or raise an Exception if errors were
encountered. If the final result or error has not been set
yet, this method will block until that time.
Example usage::
>>> future = session.execute_async("SELECT * FROM mycf")
>>> # do other stuff...
>>> try:
... rows = future.result()
... for row in rows:
... ... # process results
... except:
... log.exception("Operation failed:")
"""
if self._final_result is not _NO_RESULT_YET:
return self._final_result
@@ -1511,11 +1533,24 @@ class ResponseFuture(object):
through as additional positional or keyword arguments to `fn`.
If an error is hit while executing the operation, a callback attached
here will not be called. Use ``add_errback`` or ``add_callbacks``
here will not be called. Use :meth:`.add_errback()` or :meth:`add_callbacks()`
if you wish to handle that case.
If the final result has already been seen when this method is called,
the callback will be called immediately (before this method returns).
Usage example::
>>> session = cluster.connect("mykeyspace")
>>> def handle_results(rows, start_time, should_log=False):
... if should_log:
... log.info("Total time: %f", time.time() - start_time)
... ...
>>> future = session.execute_async("SELECT * FROM users")
>>> future.add_callback(handle_results, time.time(), should_log=True)
"""
if self._final_result is not _NO_RESULT_YET:
fn(self._final_result, *args, **kwargs)
@@ -1525,7 +1560,7 @@ class ResponseFuture(object):
def add_errback(self, fn, *args, **kwargs):
"""
Like :meth:`~.ResultFuture.add_callback()`, but handles error cases.
Like :meth:`.add_callback()`, but handles error cases.
An Exception instance will be passed as the first positional argument
to `fn`.
"""
@@ -1539,8 +1574,26 @@ class ResponseFuture(object):
callback_args=(), callback_kwargs=None,
errback_args=(), errback_kwargs=None):
"""
A convenient combination of :meth:`~.ResultFuture.add_callback()` and
:meth:`~.ResultFuture.add_errback()``.
A convenient combination of :meth:`.add_callback()` and
:meth:`.add_errback()`.
Example usage::
>>> session = cluster.connect()
>>> query = "SELECT * FROM mycf"
>>> future = session.execute_async(query)
>>> def log_results(results, level='debug'):
... for row in results:
... log.log(level, "Result: %s", row)
>>> def log_error(exc, query):
... log.error("Query '%s' failed: %s", query, exc)
>>> future.add_callbacks(
... callback=log_results, callback_kwargs={'level': 'info'},
... errback=log_error, errback_args=(query,))
"""
self.add_callback(callback, *callback_args, **(callback_kwargs or {}))
self.add_errback(errback, *errback_args, **(errback_kwargs or {}))

View File

@@ -247,7 +247,7 @@ class HostConnectionPool(object):
self._conn_available_condition = Condition()
core_conns = session.cluster.get_core_connections_per_host(host_distance)
self._connections = [session.cluster._connection_factory(host.address)
self._connections = [session.cluster.connection_factory(host.address)
for i in range(core_conns)]
self._trash = set()
self.open_count = core_conns
@@ -339,7 +339,7 @@ class HostConnectionPool(object):
self.open_count += 1
try:
conn = self._session.cluster._connection_factory(self.host.address)
conn = self._session.cluster.connection_factory(self.host.address)
with self._lock:
new_connections = self._connections[:] + [conn]
self._connections = new_connections

19
docs/api/cassandra.rst Normal file
View File

@@ -0,0 +1,19 @@
:mod:`cassandra` - Exceptions and Enums
=======================================
.. module:: cassandra
.. autoclass:: ConsistencyLevel
:members:
.. autoexception:: Unavailable()
:members:
.. autoexception:: Timeout()
:members:
.. autoexception:: ReadTimeout()
:members:
.. autoexception:: WriteTimeout()
:members:

View File

@@ -0,0 +1,19 @@
``cassandra.cluster``
=====================
.. module:: cassandra.cluster
.. autoclass:: Cluster ([contact_points=('127.0.0.1',)][, port=9042][, executor_threads=2], **attr_kwargs)
:members:
:exclude-members: on_up, on_down, add_host, remove_host, connection_factory
.. autoclass:: Session ()
:members:
:exclude-members: on_up, on_down, on_add, on_remove, add_host, prepare_on_all_hosts, submit
.. autoclass:: ResponseFuture ()
:members:
:exclude-members: send_request
.. autoexception:: NoHostAvailable ()
:members:

17
docs/api/index.rst Normal file
View File

@@ -0,0 +1,17 @@
API Documentation
=================
Cassandra Modules
-----------------
.. toctree::
:maxdepth: 2
cassandra
cassandra/cluster
cassandra/metadata
cassandra/query
cassandra/pool
cassandra/connection
cassandra/io/asyncorereactor
cassandra/io/pyevreactor

View File

@@ -11,12 +11,14 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
import os
import sys
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
sys.path.insert(0, os.path.abspath('..'))
import cassandra
# -- General configuration -----------------------------------------------------
@@ -48,9 +50,12 @@ copyright = u'2013, Tyler Hobbs'
# built documents.
#
# The short X.Y version.
version = '0.1.4'
version = cassandra.__version__
# The full version, including alpha/beta/rc tags.
release = '0.1.4'
release = cassandra.__version__
autodoc_member_order = 'bysource'
autoclass_content = 'both'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
@@ -91,7 +96,7 @@ pygments_style = 'sphinx'
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
html_theme = 'sphinxdoc'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the

View File

@@ -1,17 +1,14 @@
.. Cassandra Driver documentation master file, created by
sphinx-quickstart on Mon Jul 1 11:40:09 2013.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Cassandra Driver's documentation!
============================================
Python Cassandra Driver
=======================
Contents:
.. toctree::
:maxdepth: 2
Indices and tables
api/index
Indices and Tables
==================
* :ref:`genindex`

View File

@@ -20,10 +20,10 @@ class HostConnectionPoolTests(unittest.TestCase):
host = Mock(spec=Host, address='ip1')
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
session.cluster._connection_factory.assert_called_once_with(host.address)
session.cluster.connection_factory.assert_called_once_with(host.address)
c = pool.borrow_connection(timeout=0.01)
self.assertIs(c, conn)
@@ -38,10 +38,10 @@ class HostConnectionPoolTests(unittest.TestCase):
host = Mock(spec=Host, address='ip1')
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
session.cluster._connection_factory.assert_called_once_with(host.address)
session.cluster.connection_factory.assert_called_once_with(host.address)
pool.borrow_connection(timeout=0.01)
self.assertEqual(1, conn.in_flight)
@@ -56,10 +56,10 @@ class HostConnectionPoolTests(unittest.TestCase):
host = Mock(spec=Host, address='ip1')
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
session.cluster._connection_factory.assert_called_once_with(host.address)
session.cluster.connection_factory.assert_called_once_with(host.address)
pool.borrow_connection(timeout=0.01)
self.assertEqual(1, conn.in_flight)
@@ -80,7 +80,7 @@ class HostConnectionPoolTests(unittest.TestCase):
host = Mock(spec=Host, address='ip1')
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
session.cluster.get_core_connections_per_host.return_value = 1
# manipulate the core connection setting so that we can
@@ -121,13 +121,13 @@ class HostConnectionPoolTests(unittest.TestCase):
host = Mock(spec=Host, address='ip1')
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
# core conns = 1, max conns = 2
session.cluster.get_max_connections_per_host.return_value = 2
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
session.cluster._connection_factory.assert_called_once_with(host.address)
session.cluster.connection_factory.assert_called_once_with(host.address)
pool.borrow_connection(timeout=0.01)
self.assertEqual(1, conn.in_flight)
@@ -145,10 +145,10 @@ class HostConnectionPoolTests(unittest.TestCase):
host = Mock(spec=Host, address='ip1')
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
session.cluster._connection_factory.assert_called_once_with(host.address)
session.cluster.connection_factory.assert_called_once_with(host.address)
pool.borrow_connection(timeout=0.01)
conn.is_defunct = True
@@ -165,10 +165,10 @@ class HostConnectionPoolTests(unittest.TestCase):
host.monitor = Mock(spec=HealthMonitor)
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
session.cluster._connection_factory.assert_called_once_with(host.address)
session.cluster.connection_factory.assert_called_once_with(host.address)
pool.borrow_connection(timeout=0.01)
conn.is_defunct = True
@@ -185,10 +185,10 @@ class HostConnectionPoolTests(unittest.TestCase):
host = Mock(spec=Host, address='ip1')
session = self.make_session()
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=True)
session.cluster._connection_factory.return_value = conn
session.cluster.connection_factory.return_value = conn
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
session.cluster._connection_factory.assert_called_once_with(host.address)
session.cluster.connection_factory.assert_called_once_with(host.address)
pool.borrow_connection(timeout=0.01)
conn.is_closed = True