diff --git a/.gitignore b/.gitignore index ffd5db58..b875ff20 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ MANIFEST dist .coverage cover/ +docs/_build/ diff --git a/cassandra/__init__.py b/cassandra/__init__.py index cb93f3d3..25c4aaf2 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -3,15 +3,51 @@ __version__ = '.'.join(map(str, __version_info__)) class ConsistencyLevel(object): + """ + Spcifies how many replicas must respond for an operation to be considered + a success. By default, ``ONE`` is used for all operations. + """ ANY = 0 + """ + Only requires that one replica receives the write *or* the coordinator + stores a hint to replay later. Valid only for writes. + """ + ONE = 1 + """ + Only one replica needs to respond to consider the operation a success + """ + TWO = 2 + """ + Two replicas must respond to consider the operation a success + """ + THREE = 3 + """ + Three replicas must respond to consider the operation a success + """ + QUORUM = 4 + """ + ``ceil(RF/2)`` replicas must respond to consider the operation a success + """ + ALL = 5 + """ + All replicas must respond to consider the operation a success + """ + LOCAL_QUORUM = 6 + """ + Requires a quorum of replicas in the local datacenter + """ + EACH_QUORUM = 7 + """ + Requires a quorum of replicas in each datacenter + """ ConsistencyLevel.value_to_name = { ConsistencyLevel.ANY: 'ANY', @@ -37,10 +73,20 @@ ConsistencyLevel.name_to_value = { class Unavailable(Exception): + """ + There were not enough live replicas to satisfy the requested consistency + level, so the coordinator node immediately failed the request without + forwarding it to any replicas. + """ consistency = None + """ The requested :class:`ConsistencyLevel` """ + required_replicas = None + """ The number of replicas that needed to be live to complete the operation """ + alive_replicas = None + """ The number of replicas that were actually alive """ def __init__(self, message, consistency=None, required_replicas=None, alive_replicas=None): Exception.__init__(self, message) @@ -50,10 +96,21 @@ class Unavailable(Exception): class Timeout(Exception): + """ + Replicas failed to respond to the coordinator node before timing out. + """ consistency = None + """ The requested :class:`ConsistencyLevel` """ + required_responses = None + """ The number of required replica responses """ + received_responses = None + """ + The number of replicas that responded before the coordinator timed out + the operation + """ def __init__(self, message, consistency=None, required_responses=None, received_responses=None): Exception.__init__(self, message) @@ -63,8 +120,16 @@ class Timeout(Exception): class ReadTimeout(Timeout): + """ + A subclass of :exc:`Timeout` for read operations. + """ data_retrieved = None + """ + A boolean indicating whether the requested data was retrieved + by the coordinator from any replicas before it timed out the + operation + """ def __init__(self, message, data_retrieved=None, **kwargs): Timeout.__init__(self, message, **kwargs) @@ -72,8 +137,14 @@ class ReadTimeout(Timeout): class WriteTimeout(Timeout): + """ + A subclass of :exc:`Timeout` for write operations. + """ write_type = None + """ + The type of write operation, enum on :class:`~cassandra.policies.WriteType` + """ def __init__(self, message, write_type=None, **kwargs): Timeout.__init__(self, message, **kwargs) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 513c83ca..eeaf270a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -63,6 +63,8 @@ class NoHostAvailable(Exception): class Cluster(object): """ The main class to use when interacting with a Cassandra cluster. + Typically, one instance of this class will be created for each + separate Cassandra cluster that your application interacts with. Example usage:: @@ -95,29 +97,29 @@ class Cluster(object): load_balancing_policy_factory = RoundRobinPolicy """ A factory function which creates instances of subclasses of - :cls:`policies.LoadBalancingPolicy`. Defaults to - :cls:`policies.RoundRobinPolicy`. + :class:`policies.LoadBalancingPolicy`. Defaults to + :class:`policies.RoundRobinPolicy`. """ reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0) """ - An instance of :cls:`policies.ReconnectionPolicy`. Defaults to an instance - of :cls:`ExponentialReconnectionPolicy` with a base delay of one second and + An instance of :class:`policies.ReconnectionPolicy`. Defaults to an instance + of :class:`ExponentialReconnectionPolicy` with a base delay of one second and a max delay of ten minutes. """ retry_policy_factory = RetryPolicy """ A factory function which creates instances of - :cls:`policies.RetryPolicy`. Defaults to - :cls:`policies.RetryPolicy`. + :class:`policies.RetryPolicy`. Defaults to + :class:`policies.RetryPolicy`. """ conviction_policy_factory = SimpleConvictionPolicy """ A factory function which creates instances of - :cls:`policies.ConvictionPolicy`. Defaults to - :cls:`policies.SimpleConvictionPolicy`. + :class:`policies.ConvictionPolicy`. Defaults to + :class:`policies.SimpleConvictionPolicy`. """ metrics_enabled = False @@ -127,7 +129,7 @@ class Cluster(object): sockopts = None """ - An optional list of tuples which will be used as *args to + An optional list of tuples which will be used as arguments to ``socket.setsockopt()`` for all created sockets. """ @@ -139,7 +141,7 @@ class Cluster(object): metadata = None """ - An instance of :cls:`cassandra.metadata.Metadata`. + An instance of :class:`cassandra.metadata.Metadata`. """ connection_class = AsyncoreConnection @@ -151,7 +153,7 @@ class Cluster(object): By default, ``AsyncoreConnection`` will be used, which uses the ``asyncore`` module in the Python standard library. The - performance slightly worse than with ``pyev``, but it is + performance is slightly worse than with ``pyev``, but it is supported on a wider range of systems. """ @@ -176,6 +178,10 @@ class Cluster(object): sockopts=None, executor_threads=2, max_schema_agreement_wait=10): + """ + Any of the mutable Cluster attributes may be set as keyword arguments + to the constructor. + """ self.contact_points = contact_points self.port = port @@ -275,7 +281,11 @@ class Cluster(object): def set_max_connections_per_host(self, host_distance, max_connections): self._max_connections_per_host[host_distance] = max_connections - def _connection_factory(self, address, *args, **kwargs): + def connection_factory(self, address, *args, **kwargs): + """ + Called to create a new connection with proper configuration. + Intended for internal use only. + """ if self.auth_provider: kwargs['credentials'] = self.auth_provider(address) @@ -295,7 +305,7 @@ class Cluster(object): def connect(self, keyspace=None): """ - Creates and returns a new :cls:`~.Session` object. If `keyspace` + Creates and returns a new :class:`~.Session` object. If `keyspace` is specified, that keyspace will be the default keyspace for operations on the ``Session``. """ @@ -361,19 +371,25 @@ class Cluster(object): return session def on_up(self, host): - """ Internal method """ + """ + Called when a host is marked up by its :class:`~.HealthMonitor`. + Intended for internal use only. + """ reconnector = host.get_and_set_reconnection_handler(None) if reconnector: reconnector.cancel() - self.prepare_all_queries(host) + self._prepare_all_queries(host) self.control_connection.on_up(host) for session in self.sessions: session.on_up(host) def on_down(self, host): - """ Internal method """ + """ + Called when a host is marked down by its :class:`~.HealthMonitor`. + Intended for internal use only. + """ self.control_connection.on_down(host) for session in self.sessions: session.on_down(host) @@ -395,32 +411,32 @@ class Cluster(object): reconnector.start() - def on_add(self, host): - """ Internal method """ - self.prepare_all_queries(host) - self.control_connection.on_add(host) - for session in self.sessions: # TODO need to copy/lock? - session.on_add(host) - - def on_remove(self, host): - """ Internal method """ - self.control_connection.on_remove(host) - for session in self.sessions: - session.on_remove(host) - def add_host(self, address, signal): - """ Internal method """ + """ + Called when adding initial contact points and when the control + connection subsequently discovers a new node. Intended for internal + use only. + """ log.info("Now considering host %s for new connections", address) new_host = self.metadata.add_host(address) if new_host and signal: - self.on_add(new_host) + self._prepare_all_queries(new_host) + self.control_connection.on_add(new_host) + for session in self.sessions: # TODO need to copy/lock? + session.on_add(new_host) + return new_host def remove_host(self, host): - """ Internal method """ + """ + Called when the control connection observes that a node has left the + ring. Intended for internal use only. + """ log.info("Host %s will no longer be considered for new connections", host) if host and self.metadata.remove_host(host): - self.on_remove(host) + self.control_connection.on_remove(host) + for session in self.sessions: + session.on_remove(host) def ensure_core_connections(self): """ @@ -440,13 +456,13 @@ class Cluster(object): return self.executor.submit( self.control_connection.refresh_schema, keyspace, table) - def prepare_all_queries(self, host): + def _prepare_all_queries(self, host): if not self._prepared_statements: return log.debug("Preparing all known prepared statements against host %s" % (host,)) try: - connection = self._connection_factory(host.address) + connection = self.connection_factory(host.address) try: self.control_connection.wait_for_schema_agreement(connection) except: @@ -477,7 +493,6 @@ class Cluster(object): log.exception("Error trying to prepare all statements on host %s" % (host,)) def prepare_on_all_sessions(self, md5_id, prepared_statement, excluded_host): - """ Internal """ self._prepared_statements[md5_id] = prepared_statement for session in self.sessions: session.prepare_on_all_hosts(prepared_statement.query_string, excluded_host) @@ -487,7 +502,7 @@ class Session(object): """ A collection of connection pools for each host in the cluster. Instances of this class should not be created directly, only - using :meth:`~.Cluster.connect()`. + using :meth:`.Cluster.connect()`. Queries and statements can be executed through ``Session`` instances using the :meth:`~.Session.execute()` and :meth:`~.Session.execute_async()` @@ -542,18 +557,18 @@ class Session(object): If an error is encountered while executing the query, an Exception will be raised. - `query` may be a query string or an instance of :cls:`cassandra.query.Query`. + `query` may be a query string or an instance of :class:`cassandra.query.Query`. `parameters` may be a sequence or dict of parameters to bind. If a - sequence is used, '%s' should be used the placeholder for each - argument. If a dict is used, '%(name)s' style placeholders must + sequence is used, ``%s`` should be used the placeholder for each + argument. If a dict is used, ``%(name)s`` style placeholders must be used. """ return self.execute_async(query, parameters).result() def execute_async(self, query, parameters=None): """ - Execute the given query and return a :cls:`~.ResponseFuture` object + Execute the given query and return a :class:`~.ResponseFuture` object which callbacks may be attached to for asynchronous response delivery. You may also call :meth:`~.ResponseFuture.result()` on the ``ResponseFuture`` to syncronously block for results at @@ -563,26 +578,25 @@ class Session(object): >>> session = cluster.connect() >>> future = session.execute_async("SELECT * FROM mycf") - >>> - >>> def print_results(results): + + >>> def log_results(results): ... for row in results: - ... print row - >>> - >>> def handle_error(exc): - >>> print exc - >>> - >>> future.add_callbacks(print_results, handle_error) + ... log.info("Results: %s", row) + + >>> def log_error(exc): + >>> log.error("Operation failed: %s", exc) + + >>> future.add_callbacks(log_results, log_error) Async execution with blocking wait for results:: >>> future = session.execute_async("SELECT * FROM mycf") - >>> >>> # do other stuff... - >>> + >>> try: ... results = future.result() ... except: - ... print "operation failed!" + ... log.exception("Operation failed:") """ if isinstance(query, basestring): @@ -609,6 +623,16 @@ class Session(object): return future def prepare(self, query): + """ + Prepares a query string, returing a :class:`~cassandra.query.PreparedStatement` + instance which can be used as follows:: + + >>> session = cluster.connect("mykeyspace") + >>> query = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)" + >>> prepared = session.prepare(query) + >>> session.execute(prepared.bind((user.id, user.name, user.age))) + + """ message = PrepareMessage(query=query) future = ResponseFuture(self, message, query=None) try: @@ -630,7 +654,10 @@ class Session(object): return prepared_statement def prepare_on_all_hosts(self, query, excluded_host): - """ Internal """ + """ + Prepare the given query on all hosts, excluding ``excluded_host``. + Intended for internal use only. + """ for host, pool in self._pools.items(): if host != excluded_host: future = ResponseFuture(self, PrepareMessage(query=query), None) @@ -693,14 +720,20 @@ class Session(object): return previous def on_up(self, host): - """ Internal """ + """ + Called by the parent Cluster instance when a host's :class:`HealthMonitor` + marks it up. Only intended for internal use. + """ previous_pool = self.add_host(host) self._load_balancer.on_up(host) if previous_pool: previous_pool.shutdown() def on_down(self, host): - """ Internal """ + """ + Called by the parent Cluster instance when a host's :class:`HealthMonitor` + marks it down. Only intended for internal use. + """ self._load_balancer.on_down(host) pool = self._pools.pop(host, None) if pool: @@ -849,7 +882,7 @@ class ControlConnection(object): node/token and schema metadata. """ log.debug("[control connection] Opening new connection to %s" % (host,)) - connection = self._cluster._connection_factory(host.address) + connection = self._cluster.connection_factory(host.address) log.debug("[control connection] Established new connection, registering " "watchers and refreshing schema and topology") @@ -1212,37 +1245,13 @@ _NO_RESULT_YET = object() class ResponseFuture(object): """ An asynchronous response delivery mechanism that is returned from calls - to :meth:`~.Session.execute_async()`. + to :meth:`.Session.execute_async()`. There are two ways for results to be delivered: - - Asynchronously, by attaching callback and errback functions - - Synchronously, by calling :meth:`~.ResponseFuture.result()` - - Callback and errback example:: - - >>> session = cluster.connect() - >>> future = session.execute_async("SELECT * FROM mycf") - >>> - >>> def print_results(results): - ... for row in results: - ... print row - >>> - >>> def handle_error(exc): - >>> print exc - >>> - >>> future.add_callbacks(print_results, handle_error) - - Example of using ``result()`` to synchronously wait for results:: - - >>> future = session.execute_async("SELECT * FROM mycf") - >>> - >>> # do other stuff... - >>> - >>> try: - ... results = future.result() - ... except: - ... print "operation failed!" - + - Synchronously, by calling :meth:`.result()` + - Asynchronously, by attaching callback and errback functions via + :meth:`.add_callback()`, :meth:`.add_errback()`, and + :meth:`.add_callbacks()`. """ session = None row_factory = None @@ -1488,6 +1497,19 @@ class ResponseFuture(object): Return the final result or raise an Exception if errors were encountered. If the final result or error has not been set yet, this method will block until that time. + + Example usage:: + + >>> future = session.execute_async("SELECT * FROM mycf") + >>> # do other stuff... + + >>> try: + ... rows = future.result() + ... for row in rows: + ... ... # process results + ... except: + ... log.exception("Operation failed:") + """ if self._final_result is not _NO_RESULT_YET: return self._final_result @@ -1511,11 +1533,24 @@ class ResponseFuture(object): through as additional positional or keyword arguments to `fn`. If an error is hit while executing the operation, a callback attached - here will not be called. Use ``add_errback`` or ``add_callbacks`` + here will not be called. Use :meth:`.add_errback()` or :meth:`add_callbacks()` if you wish to handle that case. If the final result has already been seen when this method is called, the callback will be called immediately (before this method returns). + + Usage example:: + + >>> session = cluster.connect("mykeyspace") + + >>> def handle_results(rows, start_time, should_log=False): + ... if should_log: + ... log.info("Total time: %f", time.time() - start_time) + ... ... + + >>> future = session.execute_async("SELECT * FROM users") + >>> future.add_callback(handle_results, time.time(), should_log=True) + """ if self._final_result is not _NO_RESULT_YET: fn(self._final_result, *args, **kwargs) @@ -1525,7 +1560,7 @@ class ResponseFuture(object): def add_errback(self, fn, *args, **kwargs): """ - Like :meth:`~.ResultFuture.add_callback()`, but handles error cases. + Like :meth:`.add_callback()`, but handles error cases. An Exception instance will be passed as the first positional argument to `fn`. """ @@ -1539,8 +1574,26 @@ class ResponseFuture(object): callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None): """ - A convenient combination of :meth:`~.ResultFuture.add_callback()` and - :meth:`~.ResultFuture.add_errback()``. + A convenient combination of :meth:`.add_callback()` and + :meth:`.add_errback()`. + + Example usage:: + + >>> session = cluster.connect() + >>> query = "SELECT * FROM mycf" + >>> future = session.execute_async(query) + + >>> def log_results(results, level='debug'): + ... for row in results: + ... log.log(level, "Result: %s", row) + + >>> def log_error(exc, query): + ... log.error("Query '%s' failed: %s", query, exc) + + >>> future.add_callbacks( + ... callback=log_results, callback_kwargs={'level': 'info'}, + ... errback=log_error, errback_args=(query,)) + """ self.add_callback(callback, *callback_args, **(callback_kwargs or {})) self.add_errback(errback, *errback_args, **(errback_kwargs or {})) diff --git a/cassandra/pool.py b/cassandra/pool.py index 3f9e2a00..7b83e29a 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -247,7 +247,7 @@ class HostConnectionPool(object): self._conn_available_condition = Condition() core_conns = session.cluster.get_core_connections_per_host(host_distance) - self._connections = [session.cluster._connection_factory(host.address) + self._connections = [session.cluster.connection_factory(host.address) for i in range(core_conns)] self._trash = set() self.open_count = core_conns @@ -339,7 +339,7 @@ class HostConnectionPool(object): self.open_count += 1 try: - conn = self._session.cluster._connection_factory(self.host.address) + conn = self._session.cluster.connection_factory(self.host.address) with self._lock: new_connections = self._connections[:] + [conn] self._connections = new_connections diff --git a/docs/api/cassandra.rst b/docs/api/cassandra.rst new file mode 100644 index 00000000..ee95e414 --- /dev/null +++ b/docs/api/cassandra.rst @@ -0,0 +1,19 @@ +:mod:`cassandra` - Exceptions and Enums +======================================= + +.. module:: cassandra + +.. autoclass:: ConsistencyLevel + :members: + +.. autoexception:: Unavailable() + :members: + +.. autoexception:: Timeout() + :members: + +.. autoexception:: ReadTimeout() + :members: + +.. autoexception:: WriteTimeout() + :members: diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst new file mode 100644 index 00000000..bf0bb3d5 --- /dev/null +++ b/docs/api/cassandra/cluster.rst @@ -0,0 +1,19 @@ +``cassandra.cluster`` +===================== + +.. module:: cassandra.cluster + +.. autoclass:: Cluster ([contact_points=('127.0.0.1',)][, port=9042][, executor_threads=2], **attr_kwargs) + :members: + :exclude-members: on_up, on_down, add_host, remove_host, connection_factory + +.. autoclass:: Session () + :members: + :exclude-members: on_up, on_down, on_add, on_remove, add_host, prepare_on_all_hosts, submit + +.. autoclass:: ResponseFuture () + :members: + :exclude-members: send_request + +.. autoexception:: NoHostAvailable () + :members: diff --git a/docs/api/index.rst b/docs/api/index.rst new file mode 100644 index 00000000..b9258d44 --- /dev/null +++ b/docs/api/index.rst @@ -0,0 +1,17 @@ +API Documentation +================= + +Cassandra Modules +----------------- + +.. toctree:: + :maxdepth: 2 + + cassandra + cassandra/cluster + cassandra/metadata + cassandra/query + cassandra/pool + cassandra/connection + cassandra/io/asyncorereactor + cassandra/io/pyevreactor diff --git a/docs/conf.py b/docs/conf.py index c5253e7b..d73b7c91 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,12 +11,14 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys, os +import os +import sys # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -#sys.path.insert(0, os.path.abspath('.')) +sys.path.insert(0, os.path.abspath('..')) +import cassandra # -- General configuration ----------------------------------------------------- @@ -48,9 +50,12 @@ copyright = u'2013, Tyler Hobbs' # built documents. # # The short X.Y version. -version = '0.1.4' +version = cassandra.__version__ # The full version, including alpha/beta/rc tags. -release = '0.1.4' +release = cassandra.__version__ + +autodoc_member_order = 'bysource' +autoclass_content = 'both' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -91,7 +96,7 @@ pygments_style = 'sphinx' # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. -html_theme = 'default' +html_theme = 'sphinxdoc' # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the diff --git a/docs/index.rst b/docs/index.rst index 194f4ccb..9571e310 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,17 +1,14 @@ -.. Cassandra Driver documentation master file, created by - sphinx-quickstart on Mon Jul 1 11:40:09 2013. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. - -Welcome to Cassandra Driver's documentation! -============================================ +Python Cassandra Driver +======================= Contents: .. toctree:: :maxdepth: 2 -Indices and tables + api/index + +Indices and Tables ================== * :ref:`genindex` diff --git a/tests/unit/test_host_connection_pool.py b/tests/unit/test_host_connection_pool.py index 281fa3bd..963762db 100644 --- a/tests/unit/test_host_connection_pool.py +++ b/tests/unit/test_host_connection_pool.py @@ -20,10 +20,10 @@ class HostConnectionPoolTests(unittest.TestCase): host = Mock(spec=Host, address='ip1') session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn pool = HostConnectionPool(host, HostDistance.LOCAL, session) - session.cluster._connection_factory.assert_called_once_with(host.address) + session.cluster.connection_factory.assert_called_once_with(host.address) c = pool.borrow_connection(timeout=0.01) self.assertIs(c, conn) @@ -38,10 +38,10 @@ class HostConnectionPoolTests(unittest.TestCase): host = Mock(spec=Host, address='ip1') session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn pool = HostConnectionPool(host, HostDistance.LOCAL, session) - session.cluster._connection_factory.assert_called_once_with(host.address) + session.cluster.connection_factory.assert_called_once_with(host.address) pool.borrow_connection(timeout=0.01) self.assertEqual(1, conn.in_flight) @@ -56,10 +56,10 @@ class HostConnectionPoolTests(unittest.TestCase): host = Mock(spec=Host, address='ip1') session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn pool = HostConnectionPool(host, HostDistance.LOCAL, session) - session.cluster._connection_factory.assert_called_once_with(host.address) + session.cluster.connection_factory.assert_called_once_with(host.address) pool.borrow_connection(timeout=0.01) self.assertEqual(1, conn.in_flight) @@ -80,7 +80,7 @@ class HostConnectionPoolTests(unittest.TestCase): host = Mock(spec=Host, address='ip1') session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn session.cluster.get_core_connections_per_host.return_value = 1 # manipulate the core connection setting so that we can @@ -121,13 +121,13 @@ class HostConnectionPoolTests(unittest.TestCase): host = Mock(spec=Host, address='ip1') session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn # core conns = 1, max conns = 2 session.cluster.get_max_connections_per_host.return_value = 2 pool = HostConnectionPool(host, HostDistance.LOCAL, session) - session.cluster._connection_factory.assert_called_once_with(host.address) + session.cluster.connection_factory.assert_called_once_with(host.address) pool.borrow_connection(timeout=0.01) self.assertEqual(1, conn.in_flight) @@ -145,10 +145,10 @@ class HostConnectionPoolTests(unittest.TestCase): host = Mock(spec=Host, address='ip1') session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn pool = HostConnectionPool(host, HostDistance.LOCAL, session) - session.cluster._connection_factory.assert_called_once_with(host.address) + session.cluster.connection_factory.assert_called_once_with(host.address) pool.borrow_connection(timeout=0.01) conn.is_defunct = True @@ -165,10 +165,10 @@ class HostConnectionPoolTests(unittest.TestCase): host.monitor = Mock(spec=HealthMonitor) session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn pool = HostConnectionPool(host, HostDistance.LOCAL, session) - session.cluster._connection_factory.assert_called_once_with(host.address) + session.cluster.connection_factory.assert_called_once_with(host.address) pool.borrow_connection(timeout=0.01) conn.is_defunct = True @@ -185,10 +185,10 @@ class HostConnectionPoolTests(unittest.TestCase): host = Mock(spec=Host, address='ip1') session = self.make_session() conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=True) - session.cluster._connection_factory.return_value = conn + session.cluster.connection_factory.return_value = conn pool = HostConnectionPool(host, HostDistance.LOCAL, session) - session.cluster._connection_factory.assert_called_once_with(host.address) + session.cluster.connection_factory.assert_called_once_with(host.address) pool.borrow_connection(timeout=0.01) conn.is_closed = True