From dee2996e769938ec32173d9eebe26af59b77dff7 Mon Sep 17 00:00:00 2001 From: Irina Kaplounova Date: Thu, 7 Aug 2014 09:51:05 -0700 Subject: [PATCH] Test for PYTHON-91 --- tests/integration/__init__.py | 14 ++++- tests/integration/standard/test_query.py | 71 +++++++++++++++++++++++- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index f5f189a7..1cc68c52 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -179,7 +179,7 @@ def setup_test_keyspace(): try: results = session.execute("SELECT keyspace_name FROM system.schema_keyspaces") existing_keyspaces = [row[0] for row in results] - for ksname in ('test1rf', 'test2rf', 'test3rf'): + for ksname in ('test1rf', 'test2rf', 'test3rf', 'lightw'): if ksname in existing_keyspaces: session.execute("DROP KEYSPACE %s" % ksname) @@ -203,6 +203,18 @@ def setup_test_keyspace(): k int PRIMARY KEY, v int )''' session.execute(ddl) + + ddl = ''' + CREATE KEYSPACE lightw + WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}''' + session.execute(ddl) + + ddl = ''' + CREATE TABLE lightw.test ( + k int PRIMARY KEY, + v int )''' + session.execute(ddl) + except Exception: traceback.print_exc() raise diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index e493a700..f79b607d 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -11,13 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os + +from cassandra.concurrent import execute_concurrent + try: import unittest2 as unittest except ImportError: - import unittest # noqa + import unittest # noqa -from cassandra import ConsistencyLevel +from cassandra import ConsistencyLevel, WriteTimeout from cassandra.query import (PreparedStatement, BoundStatement, SimpleStatement, BatchStatement, BatchType, dict_factory) from cassandra.cluster import Cluster @@ -352,3 +356,66 @@ class SerialConsistencyTests(unittest.TestCase): statement = SimpleStatement("foo") self.assertRaises(ValueError, setattr, statement, 'serial_consistency_level', ConsistencyLevel.ONE) self.assertRaises(ValueError, SimpleStatement, 'foo', serial_consistency_level=ConsistencyLevel.ONE) + + +class LightweightTransactionsTests(unittest.TestCase): + + def setUp(self): + """ + Test is skipped if run with cql version < 2 + + """ + if PROTOCOL_VERSION < 2: + raise unittest.SkipTest( + "Protocol 2.0+ is required for Lightweight transactions, currently testing against %r" + % (PROTOCOL_VERSION,)) + + self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + self.session = self.cluster.connect() + + + def tearDown(self): + """ + Shutdown cluster + """ + self.cluster.shutdown() + + + def test_no_connection_refused_on_timeout(self): + """ + Test for PYTHON-91 "Connection closed after LWT timeout" + Verifies that connection to the cluster is not shut down when timeout occurs. + Number of iterations can be specified with LWT_ITERATIONS environment variable. + Default value is 1000 + """ + ok = True + insert_statement = self.session.prepare("INSERT INTO lightw.test (k, v) VALUES (0, 0) IF NOT EXISTS") + delete_statement = self.session.prepare("DELETE FROM lightw.test WHERE k = 0 IF EXISTS") + + iterations = int(os.getenv("LWT_ITERATIONS", 1000)) + print("Started test for %d iterations" % iterations) + + # Prepare series of parallel statements + statements_and_params = [] + for i in range(iterations): + statements_and_params.append((insert_statement, ())) + statements_and_params.append((delete_statement, ())) + + results = execute_concurrent(self.session, statements_and_params, raise_on_first_error=False) + for (success, result) in results: + if success: + continue + # In this case result is an exception + if type(result).__name__ == "NoHostAvailable": + print("PYTHON-91: Disconnected from Cassandra: %s" % result.message) + ok = False + break + if type(result).__name__ == "WriteTimeout": + print("Timeout: %s" % result.message) + continue + ok = False + print("Unexpected exception %s: %s" % (type(result).__name__, result.message)) + break + + # Make sure test passed + self.assertTrue(ok) \ No newline at end of file