From f7ef27e006c72049fd99a25db61f8a31f1997907 Mon Sep 17 00:00:00 2001 From: GregBestland Date: Thu, 2 Jul 2015 18:02:29 -0500 Subject: [PATCH] Adding timer tests for async, libev, and twisted. Added integration tests. All for PYTHON-108 --- tests/__init__.py | 20 ++-- tests/integration/long/test_failure_types.py | 90 +++++++++++++++- tests/unit/io/eventlet_utils.py | 37 +++++++ tests/unit/io/gevent_utils.py | 56 ++++++++++ tests/unit/io/test_asyncorereactor.py | 40 ++++++- tests/unit/io/test_eventletreactor.py | 67 ++++++++++++ tests/unit/io/test_geventreactor.py | 78 ++++++++++++++ tests/unit/io/test_libevreactor.py | 9 +- tests/unit/io/test_libevtimer.py | 82 ++++++++++++++ tests/unit/io/test_twistedreactor.py | 53 ++++++++- tests/unit/io/utils.py | 107 +++++++++++++++++++ tox.ini | 13 ++- 12 files changed, 633 insertions(+), 19 deletions(-) create mode 100644 tests/unit/io/eventlet_utils.py create mode 100644 tests/unit/io/gevent_utils.py create mode 100644 tests/unit/io/test_eventletreactor.py create mode 100644 tests/unit/io/test_geventreactor.py create mode 100644 tests/unit/io/test_libevtimer.py create mode 100644 tests/unit/io/utils.py diff --git a/tests/__init__.py b/tests/__init__.py index e1c1e54e..150e600c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -14,6 +14,7 @@ import logging import sys +import socket log = logging.getLogger() log.setLevel('DEBUG') @@ -24,15 +25,18 @@ if not log.handlers: log.addHandler(handler) -def is_gevent_monkey_patched(): - return 'gevent.monkey' in sys.modules - - def is_eventlet_monkey_patched(): - if 'eventlet.patcher' in sys.modules: - import eventlet - return eventlet.patcher.is_monkey_patched('socket') - return False + if 'eventlet.patcher' not in sys.modules: + return False + import eventlet.patcher + return eventlet.patcher.is_monkey_patched('socket') + + +def is_gevent_monkey_patched(): + if 'gevent.monkey' not in sys.modules: + return False + import gevent.socket + return socket.socket is gevent.socket.socket def is_monkey_patched(): diff --git a/tests/integration/long/test_failure_types.py b/tests/integration/long/test_failure_types.py index 30e05b60..e611adca 100644 --- a/tests/integration/long/test_failure_types.py +++ b/tests/integration/long/test_failure_types.py @@ -12,14 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys, logging, traceback +import sys,logging, traceback, time from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,\ FunctionFailure from cassandra.cluster import Cluster from cassandra.concurrent import execute_concurrent_with_args from cassandra.query import SimpleStatement -from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster +from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node +from mock import Mock try: import unittest2 as unittest @@ -301,3 +302,88 @@ class ClientExceptionTests(unittest.TestCase): """ DROP TABLE test3rf.d; """, consistency_level=ConsistencyLevel.ALL, expected_exception=None) + + +class TimeoutTimerTest(unittest.TestCase): + def setUp(self): + """ + Setup sessions and pause node1 + """ + self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + self.session = self.cluster.connect() + + # self.node1, self.node2, self.node3 = get_cluster().nodes.values() + self.node1 = get_node(1) + self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + self.session = self.cluster.connect() + + ddl = ''' + CREATE TABLE test3rf.timeout ( + k int PRIMARY KEY, + v int )''' + self.session.execute(ddl) + self.node1.pause() + + def tearDown(self): + """ + Shutdown cluster and resume node1 + """ + self.node1.resume() + self.session.execute("DROP TABLE test3rf.timeout") + self.cluster.shutdown() + + def test_async_timeouts(self): + """ + Test to validate that timeouts are honored + + + Exercise the underlying timeouts, by attempting a query that will timeout. Ensure the default timeout is still + honored. Make sure that user timeouts are also honored. + + @since 2.7.0 + @jira_ticket PYTHON-108 + @expected_result timeouts should be honored + + @test_category + + """ + + # Because node1 is stopped these statements will all timeout + ss = SimpleStatement('SELECT * FROM test3rf.test', consistency_level=ConsistencyLevel.ALL) + + # Test with default timeout (should be 10) + start_time = time.time() + future = self.session.execute_async(ss) + with self.assertRaises(OperationTimedOut): + future.result() + end_time = time.time() + total_time = end_time-start_time + expected_time = self.session.default_timeout + # check timeout and ensure it's within a reasonable range + self.assertAlmostEqual(expected_time, total_time, delta=.05) + + # Test with user defined timeout (Should be 1) + start_time = time.time() + future = self.session.execute_async(ss, timeout=1) + mock_callback = Mock(return_value=None) + mock_errorback = Mock(return_value=None) + future.add_callback(mock_callback) + future.add_errback(mock_errorback) + + with self.assertRaises(OperationTimedOut): + future.result() + end_time = time.time() + total_time = end_time-start_time + expected_time = 1 + # check timeout and ensure it's within a reasonable range + self.assertAlmostEqual(expected_time, total_time, delta=.05) + self.assertTrue(mock_errorback.called) + self.assertFalse(mock_callback.called) + + + + + + + + diff --git a/tests/unit/io/eventlet_utils.py b/tests/unit/io/eventlet_utils.py new file mode 100644 index 00000000..8aee030f --- /dev/null +++ b/tests/unit/io/eventlet_utils.py @@ -0,0 +1,37 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import select +import socket +import thread +import Queue +import threading +import __builtin__ +import ssl +import time + + +def eventlet_un_patch_all(): + """ + A method to unpatch eventlet monkey patching used for the reactor tests + """ + + # These are the modules that are loaded by eventlet we reload them all + modules_to_unpatch = [os, select, socket, thread, time, Queue, threading, ssl, __builtin__] + for to_unpatch in modules_to_unpatch: + reload(to_unpatch) + + diff --git a/tests/unit/io/gevent_utils.py b/tests/unit/io/gevent_utils.py new file mode 100644 index 00000000..1c6e27ad --- /dev/null +++ b/tests/unit/io/gevent_utils.py @@ -0,0 +1,56 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from gevent import monkey + + +def gevent_un_patch_all(): + """ + A method to unpatch gevent libraries. These are unloaded + in the same order that gevent monkey patch loads theirs. + Order cannot be arbitrary. This is used in the unit tests to + un monkey patch gevent + """ + restore_saved_module("os") + restore_saved_module("time") + restore_saved_module("thread") + restore_saved_module("threading") + restore_saved_module("_threading_local") + restore_saved_module("stdin") + restore_saved_module("stdout") + restore_saved_module("socket") + restore_saved_module("select") + restore_saved_module("ssl") + restore_saved_module("subprocess") + + +def restore_saved_module(module): + """ + gevent monkey patch keeps a list of all patched modules. + This will restore the original ones + :param module: to unpatch + :return: + """ + + # Check the saved attributes in geven monkey patch + if not (module in monkey.saved): + return + _module = __import__(module) + + # If it exist unpatch it + for attr in monkey.saved[module]: + if hasattr(_module, attr): + setattr(_module, attr, monkey.saved[module][attr]) + diff --git a/tests/unit/io/test_asyncorereactor.py b/tests/unit/io/test_asyncorereactor.py index 9d97f468..16fbf2fb 100644 --- a/tests/unit/io/test_asyncorereactor.py +++ b/tests/unit/io/test_asyncorereactor.py @@ -21,20 +21,20 @@ except ImportError: import errno import math +import time from mock import patch, Mock import os from six import BytesIO import socket from socket import error as socket_error - from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT, - ConnectionException, ProtocolError) + ConnectionException, ProtocolError,Timer) from cassandra.io.asyncorereactor import AsyncoreConnection from cassandra.protocol import (write_stringmultimap, write_int, write_string, SupportedMessage, ReadyMessage, ServerError) from cassandra.marshal import uint8_pack, uint32_pack, int32_pack - from tests import is_monkey_patched +from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback class AsyncoreConnectionTest(unittest.TestCase): @@ -300,3 +300,37 @@ class AsyncoreConnectionTest(unittest.TestCase): self.assertTrue(c.connected_event.is_set()) self.assertFalse(c.is_defunct) + + def test_multi_timer_validation(self, *args): + """ + Verify that timer timeouts are honored appropriately + """ + c = self.make_connection() + # Tests timers submitted in order at various timeouts + submit_and_wait_for_completion(self, AsyncoreConnection, 0, 100, 1, 100) + # Tests timers submitted in reverse order at various timeouts + submit_and_wait_for_completion(self, AsyncoreConnection, 100, 0, -1, 100) + # Tests timers submitted in varying order at various timeouts + submit_and_wait_for_completion(self, AsyncoreConnection, 0, 100, 1, 100, True) + + def test_timer_cancellation(self): + """ + Verify that timer cancellation is honored + """ + + # Various lists for tracking callback stage + connection = self.make_connection() + timeout = .1 + callback = TimerCallback(timeout) + timer = connection.create_timer(timeout, callback.invoke) + timer.cancel() + # Release context allow for timer thread to run. + time.sleep(.2) + timer_manager = connection._loop._timers + # Assert that the cancellation was honored + self.assertFalse(timer_manager._queue) + self.assertFalse(timer_manager._new_timers) + self.assertFalse(callback.was_invoked()) + + + diff --git a/tests/unit/io/test_eventletreactor.py b/tests/unit/io/test_eventletreactor.py new file mode 100644 index 00000000..9f071d3b --- /dev/null +++ b/tests/unit/io/test_eventletreactor.py @@ -0,0 +1,67 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa + +from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback +from tests import is_eventlet_monkey_patched +import time + +try: + from cassandra.io.eventletreactor import EventletConnection +except ImportError: + EventletConnection = None # noqa + + +class EventletTimerTest(unittest.TestCase): + + def setUp(self): + if EventletConnection is None: + raise unittest.SkipTest("Eventlet libraries not available") + if not is_eventlet_monkey_patched(): + raise unittest.SkipTest("Can't test eventlet without monkey patching") + EventletConnection.initialize_reactor() + + def test_multi_timer_validation(self, *args): + """ + Verify that timer timeouts are honored appropriately + """ + # Tests timers submitted in order at various timeouts + submit_and_wait_for_completion(self, EventletConnection, 0, 100, 1, 100) + # Tests timers submitted in reverse order at various timeouts + submit_and_wait_for_completion(self, EventletConnection, 100, 0, -1, 100) + # Tests timers submitted in varying order at various timeouts + submit_and_wait_for_completion(self, EventletConnection, 0, 100, 1, 100, True) + + def test_timer_cancellation(self): + """ + Verify that timer cancellation is honored + """ + + # Various lists for tracking callback stage + timeout = .1 + callback = TimerCallback(timeout) + timer = EventletConnection.create_timer(timeout, callback.invoke) + timer.cancel() + # Release context allow for timer thread to run. + time.sleep(.2) + timer_manager = EventletConnection._timers + # Assert that the cancellation was honored + self.assertFalse(timer_manager._queue) + self.assertFalse(timer_manager._new_timers) + self.assertFalse(callback.was_invoked()) \ No newline at end of file diff --git a/tests/unit/io/test_geventreactor.py b/tests/unit/io/test_geventreactor.py new file mode 100644 index 00000000..c90be37d --- /dev/null +++ b/tests/unit/io/test_geventreactor.py @@ -0,0 +1,78 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa + +import time +from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback +from tests import is_gevent_monkey_patched + +try: + from cassandra.io.geventreactor import GeventConnection + import gevent.monkey + from gevent_utils import gevent_un_patch_all +except ImportError: + GeventConnection = None # noqa + + +class GeventTimerTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + if GeventConnection is not None: + if not is_gevent_monkey_patched(): + gevent.monkey.patch_all() + + @classmethod + def tearDownClass(cls): + if is_gevent_monkey_patched(): + gevent_un_patch_all() + + def setUp(self): + if not is_gevent_monkey_patched(): + raise unittest.SkipTest("Can't test gevent without monkey patching") + GeventConnection.initialize_reactor() + + def test_multi_timer_validation(self, *args): + """ + Verify that timer timeouts are honored appropriately + """ + + # Tests timers submitted in order at various timeouts + submit_and_wait_for_completion(self, GeventConnection, 0, 100, 1, 100) + # Tests timers submitted in reverse order at various timeouts + submit_and_wait_for_completion(self, GeventConnection, 100, 0, -1, 100) + # Tests timers submitted in varying order at various timeouts + submit_and_wait_for_completion(self, GeventConnection, 0, 100, 1, 100, True), + + def test_timer_cancellation(self): + """ + Verify that timer cancellation is honored + """ + + # Various lists for tracking callback stage + timeout = .1 + callback = TimerCallback(timeout) + timer = GeventConnection.create_timer(timeout, callback.invoke) + timer.cancel() + # Release context allow for timer thread to run. + time.sleep(.2) + timer_manager = GeventConnection._timers + # Assert that the cancellation was honored + self.assertFalse(timer_manager._queue) + self.assertFalse(timer_manager._new_timers) + self.assertFalse(callback.was_invoked()) diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index 334480ef..2b26343a 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -24,6 +24,7 @@ import six from six import BytesIO from socket import error as socket_error import sys +import time from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT, ConnectionException, ProtocolError) @@ -31,6 +32,10 @@ from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT, from cassandra.protocol import (write_stringmultimap, write_int, write_string, SupportedMessage, ReadyMessage, ServerError) from cassandra.marshal import uint8_pack, uint32_pack, int32_pack +from tests.unit.io.utils import TimerCallback +from tests.unit.io.utils import submit_and_wait_for_completion +from tests import is_monkey_patched + try: from cassandra.io.libevreactor import LibevConnection @@ -46,7 +51,7 @@ except ImportError: class LibevConnectionTest(unittest.TestCase): def setUp(self): - if 'gevent.monkey' in sys.modules: + if is_monkey_patched(): raise unittest.SkipTest("Can't test libev with monkey patching") if LibevConnection is None: raise unittest.SkipTest('libev does not appear to be installed correctly') @@ -290,4 +295,4 @@ class LibevConnectionTest(unittest.TestCase): c.handle_read(None, 0) self.assertTrue(c.connected_event.is_set()) - self.assertFalse(c.is_defunct) + self.assertFalse(c.is_defunct) \ No newline at end of file diff --git a/tests/unit/io/test_libevtimer.py b/tests/unit/io/test_libevtimer.py new file mode 100644 index 00000000..988282c2 --- /dev/null +++ b/tests/unit/io/test_libevtimer.py @@ -0,0 +1,82 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa + + +from mock import patch, Mock + +import time + +from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback +from tests import is_monkey_patched + + +try: + from cassandra.io.libevreactor import LibevConnection +except ImportError: + LibevConnection = None # noqa + + +@patch('socket.socket') +@patch('cassandra.io.libevwrapper.IO') +class LibevTimerTest(unittest.TestCase): + + def setUp(self): + if is_monkey_patched(): + raise unittest.SkipTest("Can't test libev with monkey patching") + if LibevConnection is None: + raise unittest.SkipTest('libev does not appear to be installed correctly') + LibevConnection.initialize_reactor() + + def make_connection(self): + c = LibevConnection('1.2.3.4', cql_version='3.0.1') + c._socket = Mock() + c._socket.send.side_effect = lambda x: len(x) + return c + + def test_multi_timer_validation(self, *args): + """ + Verify that timer timeouts are honored appropriately + """ + c = self.make_connection() + c.initialize_reactor() + # Tests timers submitted in order at various timeouts + submit_and_wait_for_completion(self, c, 0, 100, 1, 100) + # Tests timers submitted in reverse order at various timeouts + submit_and_wait_for_completion(self, c, 100, 0, -1, 100) + # Tests timers submitted in varying order at various timeouts + submit_and_wait_for_completion(self, c, 0, 100, 1, 100, True) + + def test_timer_cancellation(self, *args): + """ + Verify that timer cancellation is honored + """ + + # Various lists for tracking callback stage + connection = self.make_connection() + timeout = .1 + callback = TimerCallback(timeout) + timer = connection.create_timer(timeout, callback.invoke) + timer.cancel() + # Release context allow for timer thread to run. + time.sleep(.2) + timer_manager = connection._libevloop._timers + # Assert that the cancellation was honored + self.assertFalse(timer_manager._queue) + self.assertFalse(timer_manager._new_timers) + self.assertFalse(callback.was_invoked()) + diff --git a/tests/unit/io/test_twistedreactor.py b/tests/unit/io/test_twistedreactor.py index d2142b09..7dcfa560 100644 --- a/tests/unit/io/test_twistedreactor.py +++ b/tests/unit/io/test_twistedreactor.py @@ -17,6 +17,7 @@ try: except ImportError: import unittest from mock import Mock, patch +import time try: from twisted.test import proto_helpers @@ -26,6 +27,54 @@ except ImportError: twistedreactor = None # NOQA from cassandra.connection import _Frame +from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback + + +class TestTwistedTimer(unittest.TestCase): + """ + Simple test class that is used to validate that the TimerManager, and timer + classes function appropriately with the twisted infrastructure + """ + + def setUp(self): + if twistedreactor is None: + raise unittest.SkipTest("Twisted libraries not available") + twistedreactor.TwistedConnection.initialize_reactor() + + def test_multi_timer_validation(self): + """ + Verify that the timers are called in the correct order + """ + twistedreactor.TwistedConnection.initialize_reactor() + connection = twistedreactor.TwistedConnection('1.2.3.4', + cql_version='3.0.1') + # Tests timers submitted in order at various timeouts + submit_and_wait_for_completion(self, connection, 0, 100, 1, 100) + # Tests timers submitted in reverse order at various timeouts + submit_and_wait_for_completion(self, connection, 100, 0, -1, 100) + # Tests timers submitted in varying order at various timeouts + submit_and_wait_for_completion(self, connection, 0, 100, 1, 100, True) + + def test_timer_cancellation(self, *args): + """ + Verify that timer cancellation is honored + """ + + # Various lists for tracking callback stage + connection = twistedreactor.TwistedConnection('1.2.3.4', + cql_version='3.0.1') + timeout = .1 + callback = TimerCallback(timeout) + timer = connection.create_timer(timeout, callback.invoke) + timer.cancel() + # Release context allow for timer thread to run. + time.sleep(.2) + timer_manager = connection._loop._timers + # Assert that the cancellation was honored + self.assertFalse(timer_manager._queue) + self.assertFalse(timer_manager._new_timers) + self.assertFalse(callback.was_invoked()) + class TestTwistedProtocol(unittest.TestCase): @@ -96,8 +145,6 @@ class TestTwistedConnection(unittest.TestCase): twistedreactor.TwistedConnection.initialize_reactor() self.reactor_cft_patcher = patch( 'twisted.internet.reactor.callFromThread') - self.reactor_running_patcher = patch( - 'twisted.internet.reactor.running', False) self.reactor_run_patcher = patch('twisted.internet.reactor.run') self.mock_reactor_cft = self.reactor_cft_patcher.start() self.mock_reactor_run = self.reactor_run_patcher.start() @@ -107,7 +154,6 @@ class TestTwistedConnection(unittest.TestCase): def tearDown(self): self.reactor_cft_patcher.stop() self.reactor_run_patcher.stop() - self.obj_ut._loop._cleanup() def test_connection_initialization(self): """ @@ -196,3 +242,4 @@ class TestTwistedConnection(unittest.TestCase): self.obj_ut.push('123 pickup') self.mock_reactor_cft.assert_called_with( self.obj_ut.connector.transport.write, '123 pickup') + diff --git a/tests/unit/io/utils.py b/tests/unit/io/utils.py new file mode 100644 index 00000000..26307ffa --- /dev/null +++ b/tests/unit/io/utils.py @@ -0,0 +1,107 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + + +class TimerCallback(): + + invoked = False + created_time = 0 + invoked_time = 0 + expected_wait = 0 + + def __init__(self, expected_wait): + self.invoked = False + self.created_time = time.time() + self.expected_wait = expected_wait + + def invoke(self): + self.invoked_time = time.time() + self.invoked = True + + def was_invoked(self): + return self.invoked + + def get_wait_time(self): + elapsed_time = self.invoked_time - self.created_time + return elapsed_time + + def wait_match_excepted(self): + if self.expected_wait-.01 <= self.get_wait_time() <= self.expected_wait+.01: + return True + return False + + +def get_timeout(gross_time, start, end, precision, split_range): + """ + A way to generate varying timeouts based on ranges + :param gross_time: Some integer between start and end + :param start: the start value of the range + :param end: the end value of the range + :param precision: the precision to use to generate the timeout. + :param split_range: generate values from both ends + :return: a timeout value to use + """ + if(split_range): + top_num = float(end)/precision + bottom_num = float(start)/precision + if gross_time % 2 == 0: + timeout = top_num - float(gross_time)/precision + else: + timeout = bottom_num + float(gross_time)/precision + + else: + timeout = float(gross_time)/precision + + return timeout + + +def submit_and_wait_for_completion(unit_test, connection, start, end, increment, precision, split_range=False): + """ + This will submit a number of timers to the provided connection. It will then ensure that the corresponding + callback is invoked in the appropriate amount of time. + :param unit_test: Invoking unit tests + :param connection: Connection to create the timer on. + :param start: Lower bound of range. + :param end: Upper bound of the time range + :param increment: +1, or -1 + :param precision: 100 for centisecond, 1000 for milliseconds + :param split_range: True to split the range between incrementing and decrementing. + """ + + # Various lists for tracking callback as completed or pending + pending_callbacks = [] + completed_callbacks = [] + + # submit timers with various timeouts + for gross_time in range(start, end, increment): + timeout = get_timeout(gross_time, start, end, precision, split_range) + callback = TimerCallback(timeout) + connection.create_timer(timeout, callback.invoke) + pending_callbacks.append(callback) + + # wait for all the callbacks associated with the timers to be invoked + while len(pending_callbacks) is not 0: + for callback in pending_callbacks: + if callback.was_invoked(): + pending_callbacks.remove(callback) + completed_callbacks.append(callback) + time.sleep(.1) + + # ensure they are all called back in a timely fashion + for callback in completed_callbacks: + unit_test.assertAlmostEqual(callback.expected_wait, callback.get_wait_time(), delta=.1) + + diff --git a/tox.ini b/tox.ini index b6b0b519..0b4a0b9f 100644 --- a/tox.ini +++ b/tox.ini @@ -7,10 +7,12 @@ deps = nose PyYAML six + [testenv] deps = {[base]deps} - sure==1.2.3 + sure blist + setenv = USE_CASS_EXTERNAL=1 commands = {envpython} setup.py build_ext --inplace nosetests --verbosity=2 tests/unit/ @@ -19,8 +21,17 @@ commands = {envpython} setup.py build_ext --inplace [testenv:py26] deps = {[testenv]deps} unittest2 + twisted + eventlet + gevent # test skipping is different in unittest2 for python 2.7+; let's just use it where needed +[testenv:py27] +deps = {[testenv]deps} + twisted + eventlet + gevent + [testenv:pypy] deps = {[base]deps} commands = {envpython} setup.py build_ext --inplace