quiescent pool state tests
tests for in_flight == 0 after various cluster operations related to PYTHON-195 also changes pool.get_state to return a dict of properties (instead of a string) for easier testing
This commit is contained in:
@@ -366,9 +366,10 @@ class HostConnection(object):
|
||||
return [c] if c else []
|
||||
|
||||
def get_state(self):
|
||||
have_conn = self._connection is not None
|
||||
in_flight = self._connection.in_flight if have_conn else 0
|
||||
return "shutdown: %s, open: %s, in_flights: %s" % (self.is_shutdown, have_conn, in_flight)
|
||||
connection = self._connection
|
||||
open_count = 1 if connection and not (connection.is_closed or connection.is_defunct) else 0
|
||||
in_flights = [connection.in_flight] if connection else []
|
||||
return {'shutdown': self.is_shutdown, 'open_count': open_count, 'in_flights': in_flights}
|
||||
|
||||
|
||||
_MAX_SIMULTANEOUS_CREATION = 1
|
||||
@@ -703,5 +704,5 @@ class HostConnectionPool(object):
|
||||
return self._connections
|
||||
|
||||
def get_state(self):
|
||||
in_flights = ", ".join([str(c.in_flight) for c in self._connections])
|
||||
return "shutdown: %s, open_count: %d, in_flights: %s" % (self.is_shutdown, self.open_count, in_flights)
|
||||
in_flights = [c.in_flight for c in self._connections]
|
||||
return {'shutdown': self.is_shutdown, 'open_count': self.open_count, 'in_flights': in_flights}
|
||||
|
||||
@@ -31,6 +31,7 @@ from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
|
||||
from cassandra.query import SimpleStatement, TraceUnavailable
|
||||
|
||||
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions
|
||||
from tests.integration.util import assert_quiescent_pool_state
|
||||
|
||||
|
||||
def setup_module():
|
||||
@@ -495,6 +496,10 @@ class ClusterTests(unittest.TestCase):
|
||||
self.assertIn(cluster.control_connection, holders)
|
||||
self.assertEqual(len(holders), len(cluster.metadata.all_hosts()) + 1) # hosts pools, 1 for cc
|
||||
|
||||
cluster._idle_heartbeat.stop()
|
||||
cluster._idle_heartbeat.join()
|
||||
assert_quiescent_pool_state(self, cluster)
|
||||
|
||||
session.shutdown()
|
||||
|
||||
@patch('cassandra.cluster.Cluster.idle_heartbeat_interval', new=0.1)
|
||||
@@ -515,3 +520,39 @@ class ClusterTests(unittest.TestCase):
|
||||
self.assertFalse(any(c.is_idle for c in connections))
|
||||
|
||||
session.shutdown()
|
||||
|
||||
def test_pool_management(self):
|
||||
# Ensure that in_flight and request_ids quiesce after cluster operations
|
||||
cluster = Cluster(protocol_version=PROTOCOL_VERSION, idle_heartbeat_interval=0) # no idle heartbeat here, pool management is tested in test_idle_heartbeat
|
||||
session = cluster.connect()
|
||||
session2 = cluster.connect()
|
||||
|
||||
# prepare
|
||||
p = session.prepare("SELECT * FROM system.local WHERE key=?")
|
||||
self.assertTrue(session.execute(p, ('local',)))
|
||||
|
||||
# simple
|
||||
self.assertTrue(session.execute("SELECT * FROM system.local WHERE key='local'"))
|
||||
|
||||
# set keyspace
|
||||
session.set_keyspace('system')
|
||||
session.set_keyspace('system_traces')
|
||||
|
||||
# use keyspace
|
||||
session.execute('USE system')
|
||||
session.execute('USE system_traces')
|
||||
|
||||
# refresh schema
|
||||
cluster.refresh_schema()
|
||||
cluster.refresh_schema(max_schema_agreement_wait=0)
|
||||
|
||||
# submit schema refresh
|
||||
future = cluster.submit_schema_refresh()
|
||||
future.result()
|
||||
|
||||
assert_quiescent_pool_state(self, cluster)
|
||||
|
||||
session2.shutdown()
|
||||
del session2
|
||||
assert_quiescent_pool_state(self, cluster)
|
||||
session.shutdown()
|
||||
|
||||
24
tests/integration/util.py
Normal file
24
tests/integration/util.py
Normal file
@@ -0,0 +1,24 @@
|
||||
# Copyright 2013-2014 DataStax, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
def assert_quiescent_pool_state(test_case, cluster):
|
||||
|
||||
for session in cluster.sessions:
|
||||
pool_states = session.get_pool_state().values()
|
||||
test_case.assertTrue(pool_states)
|
||||
|
||||
for state in pool_states:
|
||||
test_case.assertFalse(state['shutdown'])
|
||||
test_case.assertGreater(state['open_count'], 0)
|
||||
test_case.assertTrue(all((i == 0 for i in state['in_flights'])))
|
||||
Reference in New Issue
Block a user