diff --git a/tests/integration/standard/test_concurrent.py b/tests/integration/standard/test_concurrent.py index 9a823383..45b73613 100644 --- a/tests/integration/standard/test_concurrent.py +++ b/tests/integration/standard/test_concurrent.py @@ -50,11 +50,11 @@ class ClusterTests(unittest.TestCase): def tearDownClass(cls): cls.cluster.shutdown() - def execute_concurrent_helper(self, session, query): + def execute_concurrent_helper(self, session, query, results_generator=False): count = 0 while count < 100: try: - return execute_concurrent(session, query) + return execute_concurrent(session, query, results_generator=False) except (ReadTimeout, WriteTimeout, OperationTimedOut, ReadFailure, WriteFailure): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) @@ -63,11 +63,11 @@ class ClusterTests(unittest.TestCase): raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) - def execute_concurrent_args_helper(self, session, query, params): + def execute_concurrent_args_helper(self, session, query, params, results_generator=False): count = 0 while count < 100: try: - return execute_concurrent_with_args(session, query, params) + return execute_concurrent_with_args(session, query, params, results_generator=results_generator) except (ReadTimeout, WriteTimeout, OperationTimedOut, ReadFailure, WriteFailure): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) @@ -120,6 +120,40 @@ class ClusterTests(unittest.TestCase): self.assertEqual(num_statements, len(results)) self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results) + def test_execute_concurrent_with_args_generator(self): + """ + Test to validate that generator based results are surfaced correctly + + Repeatedly inserts data into a a table and attempts to query it. It then validates that the + results are returned in the order expected + + @since 2.7.0 + @jira_ticket PYTHON-123 + @expected_result all data should be returned in order. + + @test_category queries:async + """ + for num_statements in (0, 1, 2, 7, 10, 99, 100, 101, 199, 200, 201): + statement = SimpleStatement( + "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", + consistency_level=ConsistencyLevel.QUORUM) + parameters = [(i, i) for i in range(num_statements)] + + results = self.execute_concurrent_args_helper(self.session, statement, parameters, results_generator=True) + for result in results: + self.assertEqual((True, None), result) + + # read + statement = SimpleStatement( + "SELECT v FROM test3rf.test WHERE k=%s", + consistency_level=ConsistencyLevel.QUORUM) + parameters = [(i, ) for i in range(num_statements)] + + results = self.execute_concurrent_args_helper(self.session, statement, parameters, results_generator=True) + for i in range(num_statements): + result = results.next() + self.assertEqual((True, [(i,)]), result) + def test_execute_concurrent_paged_result(self): if PROTOCOL_VERSION < 2: raise unittest.SkipTest( @@ -150,6 +184,51 @@ class ClusterTests(unittest.TestCase): self.assertIsInstance(result, PagedResult) self.assertEqual(num_statements, sum(1 for _ in result)) + def test_execute_concurrent_paged_result_generator(self): + """ + Test to validate that generator based results are surfaced correctly when paging is used + + Inserts data into a a table and attempts to query it. It then validates that the + results are returned as expected (no order specified) + + @since 2.7.0 + @jira_ticket PYTHON-123 + @expected_result all data should be returned in order. + + @test_category paging + """ + if PROTOCOL_VERSION < 2: + raise unittest.SkipTest( + "Protocol 2+ is required for Paging, currently testing against %r" + % (PROTOCOL_VERSION,)) + + num_statements = 201 + statement = SimpleStatement( + "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", + consistency_level=ConsistencyLevel.QUORUM) + parameters = [(i, i) for i in range(num_statements)] + + results = self.execute_concurrent_args_helper(self.session, statement, parameters, results_generator=True) + self.assertEqual(num_statements, sum(1 for _ in results)) + + # read + statement = SimpleStatement( + "SELECT * FROM test3rf.test LIMIT %s", + consistency_level=ConsistencyLevel.QUORUM, + fetch_size=int(num_statements / 2)) + parameters = [(i, ) for i in range(num_statements)] + + paged_results_gen = self.execute_concurrent_args_helper(self.session, statement, [(num_statements,)], results_generator=True) + + # iterate over all the result and make sure we find the correct number. + found_results = 0 + for result_tuple in paged_results_gen: + paged_result = result_tuple[1] + for _ in paged_result: + found_results += 1 + + self.assertEqual(found_results, num_statements) + def test_first_failure(self): statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", )) parameters = [(i, i) for i in range(100)] diff --git a/tests/unit/test_concurrent.py b/tests/unit/test_concurrent.py new file mode 100644 index 00000000..5e23f584 --- /dev/null +++ b/tests/unit/test_concurrent.py @@ -0,0 +1,227 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa +from itertools import cycle +from mock import Mock +import time +import threading +from six.moves.queue import PriorityQueue + +from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args + + +class MockResponseResponseFuture(): + """ + This is a mock ResponseFuture. It is used to allow us to hook into the underlying session + and invoke callback with various timing. + """ + + # a list pending callbacks, these will be prioritized in reverse or normal orderd + pending_callbacks = PriorityQueue() + + def __init__(self, reverse): + + # if this is true invoke callback in the reverse order then what they were insert + self.reverse = reverse + # hardcoded to avoid paging logic + self.has_more_pages = False + + if(reverse): + self.priority = 100 + else: + self.priority = 0 + + def add_callback(self, fn, *args, **kwargs): + """ + This is used to add a callback our pending list of callbacks. + If reverse is specified we will invoke the callback in the opposite order that we added it + """ + time_added = time.time() + self.pending_callbacks.put((self.priority, (fn, args, kwargs, time_added))) + if not reversed: + self.priority += 1 + else: + self.priority -= 1 + + def add_callbacks(self, callback, errback, + callback_args=(), callback_kwargs=None, + errback_args=(), errback_kwargs=None): + + self.add_callback(callback, *callback_args, **(callback_kwargs or {})) + + def get_next_callback(self): + return self.pending_callbacks.get() + + def has_next_callback(self): + return not self.pending_callbacks.empty() + + def has_more_pages(self): + return False + + def clear_callbacks(self): + return + + +class TimedCallableInvoker(threading.Thread): + """ + This is a local thread which is runs and invokes all the callbacks on the pending callback queue. + The slowdown flag can used to invoke random slowdowns in our simulate queries. + """ + def __init__(self, handler, slowdown=False): + super(TimedCallableInvoker, self).__init__() + self.slowdown = slowdown + self._stop = threading.Event() + self.handler = handler + + def stop(self): + self._stop.set() + + def stopped(self): + return self._stop.isSet() + + def run(self): + while(not self.stopped()): + if(self.handler.has_next_callback()): + pending_callback = self.handler.get_next_callback() + priority_num = pending_callback[0] + if (priority_num % 10) == 0 and self.slowdown: + self._stop.wait(.1) + callback_args = pending_callback[1] + fn, args, kwargs, time_added = callback_args + fn(time_added, *args, **kwargs) + self._stop.wait(.001) + return + + +class ConcurrencyTest((unittest.TestCase)): + + def test_results_ordering_forward(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorListResults + when queries complete in the order they were executed. + """ + self.insert_and_validate_list_results(False, False) + + def test_results_ordering_reverse(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorListResults + when queries complete in the reverse order they were executed. + """ + self.insert_and_validate_list_results(True, False) + + def test_results_ordering_forward_slowdown(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorListResults + when queries complete in the order they were executed, with slow queries mixed in. + """ + self.insert_and_validate_list_results(False, True) + + def test_results_ordering_reverse_slowdown(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorListResults + when queries complete in the reverse order they were executed, with slow queries mixed in. + """ + self.insert_and_validate_list_results(True, True) + + def test_results_ordering_forward_generator(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorGenResults + when queries complete in the order they were executed. + """ + self.insert_and_validate_list_generator(False, False) + + def test_results_ordering_reverse_generator(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorGenResults + when queries complete in the reverse order they were executed. + """ + self.insert_and_validate_list_generator(True, False) + + def test_results_ordering_forward_generator_slowdown(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorGenResults + when queries complete in the order they were executed, with slow queries mixed in. + """ + self.insert_and_validate_list_generator(False, True) + + def test_results_ordering_reverse_generator_slowdown(self): + """ + This tests the ordering of our various concurrent generator class ConcurrentExecutorGenResults + when queries complete in the reverse order they were executed, with slow queries mixed in. + """ + self.insert_and_validate_list_generator(True, True) + + def insert_and_validate_list_results(self, reverse, slowdown): + """ + This utility method will execute submit various statements for execution using the ConcurrentExecutorListResults, + then invoke a separate thread to execute the callback associated with the futures registered + for those statements. The parameters will toggle various timing, and ordering changes. + Finally it will validate that the results were returned in the order they were submitted + :param reverse: Execute the callbacks in the opposite order that they were submitted + :param slowdown: Cause intermittent queries to perform slowly + """ + our_handler = MockResponseResponseFuture(reverse=reverse) + mock_session = Mock() + statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), + [(i, ) for i in range(100)]) + mock_session.execute_async.return_value = our_handler + + t = TimedCallableInvoker(our_handler, slowdown=slowdown) + t.start() + results = execute_concurrent(mock_session, statements_and_params) + + while(not our_handler.pending_callbacks.empty()): + time.sleep(.01) + t.stop() + self.validate_result_ordering(results) + + def insert_and_validate_list_generator(self, reverse, slowdown): + """ + This utility method will execute submit various statements for execution using the ConcurrentExecutorGenResults, + then invoke a separate thread to execute the callback associated with the futures registered + for those statements. The parameters will toggle various timing, and ordering changes. + Finally it will validate that the results were returned in the order they were submitted + :param reverse: Execute the callbacks in the opposite order that they were submitted + :param slowdown: Cause intermittent queries to perform slowly + """ + our_handler = MockResponseResponseFuture(reverse=reverse) + mock_session = Mock() + statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]), + [(i, ) for i in range(100)]) + mock_session.execute_async.return_value = our_handler + + t = TimedCallableInvoker(our_handler, slowdown=slowdown) + t.start() + results = execute_concurrent(mock_session, statements_and_params, results_generator=True) + + self.validate_result_ordering(results) + t.stop() + + def validate_result_ordering(self, results): + """ + This method will validate that the timestamps returned from the result are in order. This indicates that the + results were returned in the order they were submitted for execution + :param results: + """ + last_time_added = 0 + for result in results: + current_time_added = result[1] + self.assertLess(last_time_added, current_time_added) + last_time_added = current_time_added +