From dee2996e769938ec32173d9eebe26af59b77dff7 Mon Sep 17 00:00:00 2001 From: Irina Kaplounova Date: Thu, 7 Aug 2014 09:51:05 -0700 Subject: [PATCH 1/3] Test for PYTHON-91 --- tests/integration/__init__.py | 14 ++++- tests/integration/standard/test_query.py | 71 +++++++++++++++++++++++- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index f5f189a7..1cc68c52 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -179,7 +179,7 @@ def setup_test_keyspace(): try: results = session.execute("SELECT keyspace_name FROM system.schema_keyspaces") existing_keyspaces = [row[0] for row in results] - for ksname in ('test1rf', 'test2rf', 'test3rf'): + for ksname in ('test1rf', 'test2rf', 'test3rf', 'lightw'): if ksname in existing_keyspaces: session.execute("DROP KEYSPACE %s" % ksname) @@ -203,6 +203,18 @@ def setup_test_keyspace(): k int PRIMARY KEY, v int )''' session.execute(ddl) + + ddl = ''' + CREATE KEYSPACE lightw + WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}''' + session.execute(ddl) + + ddl = ''' + CREATE TABLE lightw.test ( + k int PRIMARY KEY, + v int )''' + session.execute(ddl) + except Exception: traceback.print_exc() raise diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index e493a700..f79b607d 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -11,13 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os + +from cassandra.concurrent import execute_concurrent + try: import unittest2 as unittest except ImportError: - import unittest # noqa + import unittest # noqa -from cassandra import ConsistencyLevel +from cassandra import ConsistencyLevel, WriteTimeout from cassandra.query import (PreparedStatement, BoundStatement, SimpleStatement, BatchStatement, BatchType, dict_factory) from cassandra.cluster import Cluster @@ -352,3 +356,66 @@ class SerialConsistencyTests(unittest.TestCase): statement = SimpleStatement("foo") self.assertRaises(ValueError, setattr, statement, 'serial_consistency_level', ConsistencyLevel.ONE) self.assertRaises(ValueError, SimpleStatement, 'foo', serial_consistency_level=ConsistencyLevel.ONE) + + +class LightweightTransactionsTests(unittest.TestCase): + + def setUp(self): + """ + Test is skipped if run with cql version < 2 + + """ + if PROTOCOL_VERSION < 2: + raise unittest.SkipTest( + "Protocol 2.0+ is required for Lightweight transactions, currently testing against %r" + % (PROTOCOL_VERSION,)) + + self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + self.session = self.cluster.connect() + + + def tearDown(self): + """ + Shutdown cluster + """ + self.cluster.shutdown() + + + def test_no_connection_refused_on_timeout(self): + """ + Test for PYTHON-91 "Connection closed after LWT timeout" + Verifies that connection to the cluster is not shut down when timeout occurs. + Number of iterations can be specified with LWT_ITERATIONS environment variable. + Default value is 1000 + """ + ok = True + insert_statement = self.session.prepare("INSERT INTO lightw.test (k, v) VALUES (0, 0) IF NOT EXISTS") + delete_statement = self.session.prepare("DELETE FROM lightw.test WHERE k = 0 IF EXISTS") + + iterations = int(os.getenv("LWT_ITERATIONS", 1000)) + print("Started test for %d iterations" % iterations) + + # Prepare series of parallel statements + statements_and_params = [] + for i in range(iterations): + statements_and_params.append((insert_statement, ())) + statements_and_params.append((delete_statement, ())) + + results = execute_concurrent(self.session, statements_and_params, raise_on_first_error=False) + for (success, result) in results: + if success: + continue + # In this case result is an exception + if type(result).__name__ == "NoHostAvailable": + print("PYTHON-91: Disconnected from Cassandra: %s" % result.message) + ok = False + break + if type(result).__name__ == "WriteTimeout": + print("Timeout: %s" % result.message) + continue + ok = False + print("Unexpected exception %s: %s" % (type(result).__name__, result.message)) + break + + # Make sure test passed + self.assertTrue(ok) \ No newline at end of file From d994ee3f9bb6069270bd3e6ec480cf118e234568 Mon Sep 17 00:00:00 2001 From: Irina Kaplounova Date: Thu, 7 Aug 2014 11:21:07 -0700 Subject: [PATCH 2/3] Addressing code review comments --- tests/integration/__init__.py | 13 +------------ tests/integration/standard/test_query.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 1cc68c52..b55bf17c 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -179,7 +179,7 @@ def setup_test_keyspace(): try: results = session.execute("SELECT keyspace_name FROM system.schema_keyspaces") existing_keyspaces = [row[0] for row in results] - for ksname in ('test1rf', 'test2rf', 'test3rf', 'lightw'): + for ksname in ('test1rf', 'test2rf', 'test3rf'): if ksname in existing_keyspaces: session.execute("DROP KEYSPACE %s" % ksname) @@ -204,17 +204,6 @@ def setup_test_keyspace(): v int )''' session.execute(ddl) - ddl = ''' - CREATE KEYSPACE lightw - WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}''' - session.execute(ddl) - - ddl = ''' - CREATE TABLE lightw.test ( - k int PRIMARY KEY, - v int )''' - session.execute(ddl) - except Exception: traceback.print_exc() raise diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index f79b607d..01910e1f 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -373,11 +373,18 @@ class LightweightTransactionsTests(unittest.TestCase): self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) self.session = self.cluster.connect() + ddl = ''' + CREATE TABLE test3rf.lwt ( + k int PRIMARY KEY, + v int )''' + self.session.execute(ddl) + def tearDown(self): """ Shutdown cluster """ + self.session.execute("DROP TABLE test3rf.lwt") self.cluster.shutdown() @@ -389,8 +396,8 @@ class LightweightTransactionsTests(unittest.TestCase): Default value is 1000 """ ok = True - insert_statement = self.session.prepare("INSERT INTO lightw.test (k, v) VALUES (0, 0) IF NOT EXISTS") - delete_statement = self.session.prepare("DELETE FROM lightw.test WHERE k = 0 IF EXISTS") + insert_statement = self.session.prepare("INSERT INTO test3rf.lwt (k, v) VALUES (0, 0) IF NOT EXISTS") + delete_statement = self.session.prepare("DELETE FROM test3rf.lwt WHERE k = 0 IF EXISTS") iterations = int(os.getenv("LWT_ITERATIONS", 1000)) print("Started test for %d iterations" % iterations) From cb0b5d0e2790b231611ae4dac7318c7d9a945189 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 7 Aug 2014 13:49:52 -0500 Subject: [PATCH 3/3] Make LWT timeout test require timeouts to succeed. --- tests/integration/standard/test_query.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index 01910e1f..cacda2a2 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -21,7 +21,7 @@ try: except ImportError: import unittest # noqa -from cassandra import ConsistencyLevel, WriteTimeout +from cassandra import ConsistencyLevel from cassandra.query import (PreparedStatement, BoundStatement, SimpleStatement, BatchStatement, BatchType, dict_factory) from cassandra.cluster import Cluster @@ -379,7 +379,6 @@ class LightweightTransactionsTests(unittest.TestCase): v int )''' self.session.execute(ddl) - def tearDown(self): """ Shutdown cluster @@ -387,7 +386,6 @@ class LightweightTransactionsTests(unittest.TestCase): self.session.execute("DROP TABLE test3rf.lwt") self.cluster.shutdown() - def test_no_connection_refused_on_timeout(self): """ Test for PYTHON-91 "Connection closed after LWT timeout" @@ -395,7 +393,6 @@ class LightweightTransactionsTests(unittest.TestCase): Number of iterations can be specified with LWT_ITERATIONS environment variable. Default value is 1000 """ - ok = True insert_statement = self.session.prepare("INSERT INTO test3rf.lwt (k, v) VALUES (0, 0) IF NOT EXISTS") delete_statement = self.session.prepare("DELETE FROM test3rf.lwt WHERE k = 0 IF EXISTS") @@ -408,21 +405,21 @@ class LightweightTransactionsTests(unittest.TestCase): statements_and_params.append((insert_statement, ())) statements_and_params.append((delete_statement, ())) + received_timeout = False results = execute_concurrent(self.session, statements_and_params, raise_on_first_error=False) for (success, result) in results: if success: continue # In this case result is an exception if type(result).__name__ == "NoHostAvailable": - print("PYTHON-91: Disconnected from Cassandra: %s" % result.message) - ok = False + self.fail("PYTHON-91: Disconnected from Cassandra: %s" % result.message) break if type(result).__name__ == "WriteTimeout": print("Timeout: %s" % result.message) + received_timeout = True continue - ok = False - print("Unexpected exception %s: %s" % (type(result).__name__, result.message)) + self.fail("Unexpected exception %s: %s" % (type(result).__name__, result.message)) break # Make sure test passed - self.assertTrue(ok) \ No newline at end of file + self.assertTrue(received_timeout)