@@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import socket
|
||||||
|
|
||||||
log = logging.getLogger()
|
log = logging.getLogger()
|
||||||
log.setLevel('DEBUG')
|
log.setLevel('DEBUG')
|
||||||
@@ -24,15 +25,18 @@ if not log.handlers:
|
|||||||
log.addHandler(handler)
|
log.addHandler(handler)
|
||||||
|
|
||||||
|
|
||||||
def is_gevent_monkey_patched():
|
|
||||||
return 'gevent.monkey' in sys.modules
|
|
||||||
|
|
||||||
|
|
||||||
def is_eventlet_monkey_patched():
|
def is_eventlet_monkey_patched():
|
||||||
if 'eventlet.patcher' in sys.modules:
|
if 'eventlet.patcher' not in sys.modules:
|
||||||
import eventlet
|
return False
|
||||||
return eventlet.patcher.is_monkey_patched('socket')
|
import eventlet.patcher
|
||||||
return False
|
return eventlet.patcher.is_monkey_patched('socket')
|
||||||
|
|
||||||
|
|
||||||
|
def is_gevent_monkey_patched():
|
||||||
|
if 'gevent.monkey' not in sys.modules:
|
||||||
|
return False
|
||||||
|
import gevent.socket
|
||||||
|
return socket.socket is gevent.socket.socket
|
||||||
|
|
||||||
|
|
||||||
def is_monkey_patched():
|
def is_monkey_patched():
|
||||||
|
|||||||
@@ -12,14 +12,15 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import sys, logging, traceback
|
import sys,logging, traceback, time
|
||||||
|
|
||||||
from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,\
|
from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,\
|
||||||
FunctionFailure
|
FunctionFailure
|
||||||
from cassandra.cluster import Cluster
|
from cassandra.cluster import Cluster
|
||||||
from cassandra.concurrent import execute_concurrent_with_args
|
from cassandra.concurrent import execute_concurrent_with_args
|
||||||
from cassandra.query import SimpleStatement
|
from cassandra.query import SimpleStatement
|
||||||
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster
|
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node
|
||||||
|
from mock import Mock
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import unittest2 as unittest
|
import unittest2 as unittest
|
||||||
@@ -301,3 +302,88 @@ class ClientExceptionTests(unittest.TestCase):
|
|||||||
"""
|
"""
|
||||||
DROP TABLE test3rf.d;
|
DROP TABLE test3rf.d;
|
||||||
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
|
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
|
||||||
|
|
||||||
|
|
||||||
|
class TimeoutTimerTest(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
"""
|
||||||
|
Setup sessions and pause node1
|
||||||
|
"""
|
||||||
|
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||||
|
self.session = self.cluster.connect()
|
||||||
|
|
||||||
|
# self.node1, self.node2, self.node3 = get_cluster().nodes.values()
|
||||||
|
self.node1 = get_node(1)
|
||||||
|
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||||
|
self.session = self.cluster.connect()
|
||||||
|
|
||||||
|
ddl = '''
|
||||||
|
CREATE TABLE test3rf.timeout (
|
||||||
|
k int PRIMARY KEY,
|
||||||
|
v int )'''
|
||||||
|
self.session.execute(ddl)
|
||||||
|
self.node1.pause()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""
|
||||||
|
Shutdown cluster and resume node1
|
||||||
|
"""
|
||||||
|
self.node1.resume()
|
||||||
|
self.session.execute("DROP TABLE test3rf.timeout")
|
||||||
|
self.cluster.shutdown()
|
||||||
|
|
||||||
|
def test_async_timeouts(self):
|
||||||
|
"""
|
||||||
|
Test to validate that timeouts are honored
|
||||||
|
|
||||||
|
|
||||||
|
Exercise the underlying timeouts, by attempting a query that will timeout. Ensure the default timeout is still
|
||||||
|
honored. Make sure that user timeouts are also honored.
|
||||||
|
|
||||||
|
@since 2.7.0
|
||||||
|
@jira_ticket PYTHON-108
|
||||||
|
@expected_result timeouts should be honored
|
||||||
|
|
||||||
|
@test_category
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Because node1 is stopped these statements will all timeout
|
||||||
|
ss = SimpleStatement('SELECT * FROM test3rf.test', consistency_level=ConsistencyLevel.ALL)
|
||||||
|
|
||||||
|
# Test with default timeout (should be 10)
|
||||||
|
start_time = time.time()
|
||||||
|
future = self.session.execute_async(ss)
|
||||||
|
with self.assertRaises(OperationTimedOut):
|
||||||
|
future.result()
|
||||||
|
end_time = time.time()
|
||||||
|
total_time = end_time-start_time
|
||||||
|
expected_time = self.session.default_timeout
|
||||||
|
# check timeout and ensure it's within a reasonable range
|
||||||
|
self.assertAlmostEqual(expected_time, total_time, delta=.05)
|
||||||
|
|
||||||
|
# Test with user defined timeout (Should be 1)
|
||||||
|
start_time = time.time()
|
||||||
|
future = self.session.execute_async(ss, timeout=1)
|
||||||
|
mock_callback = Mock(return_value=None)
|
||||||
|
mock_errorback = Mock(return_value=None)
|
||||||
|
future.add_callback(mock_callback)
|
||||||
|
future.add_errback(mock_errorback)
|
||||||
|
|
||||||
|
with self.assertRaises(OperationTimedOut):
|
||||||
|
future.result()
|
||||||
|
end_time = time.time()
|
||||||
|
total_time = end_time-start_time
|
||||||
|
expected_time = 1
|
||||||
|
# check timeout and ensure it's within a reasonable range
|
||||||
|
self.assertAlmostEqual(expected_time, total_time, delta=.05)
|
||||||
|
self.assertTrue(mock_errorback.called)
|
||||||
|
self.assertFalse(mock_callback.called)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
37
tests/unit/io/eventlet_utils.py
Normal file
37
tests/unit/io/eventlet_utils.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
# Copyright 2013-2015 DataStax, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
import select
|
||||||
|
import socket
|
||||||
|
import thread
|
||||||
|
import Queue
|
||||||
|
import threading
|
||||||
|
import __builtin__
|
||||||
|
import ssl
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
def eventlet_un_patch_all():
|
||||||
|
"""
|
||||||
|
A method to unpatch eventlet monkey patching used for the reactor tests
|
||||||
|
"""
|
||||||
|
|
||||||
|
# These are the modules that are loaded by eventlet we reload them all
|
||||||
|
modules_to_unpatch = [os, select, socket, thread, time, Queue, threading, ssl, __builtin__]
|
||||||
|
for to_unpatch in modules_to_unpatch:
|
||||||
|
reload(to_unpatch)
|
||||||
|
|
||||||
|
|
||||||
56
tests/unit/io/gevent_utils.py
Normal file
56
tests/unit/io/gevent_utils.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
# Copyright 2013-2015 DataStax, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from gevent import monkey
|
||||||
|
|
||||||
|
|
||||||
|
def gevent_un_patch_all():
|
||||||
|
"""
|
||||||
|
A method to unpatch gevent libraries. These are unloaded
|
||||||
|
in the same order that gevent monkey patch loads theirs.
|
||||||
|
Order cannot be arbitrary. This is used in the unit tests to
|
||||||
|
un monkey patch gevent
|
||||||
|
"""
|
||||||
|
restore_saved_module("os")
|
||||||
|
restore_saved_module("time")
|
||||||
|
restore_saved_module("thread")
|
||||||
|
restore_saved_module("threading")
|
||||||
|
restore_saved_module("_threading_local")
|
||||||
|
restore_saved_module("stdin")
|
||||||
|
restore_saved_module("stdout")
|
||||||
|
restore_saved_module("socket")
|
||||||
|
restore_saved_module("select")
|
||||||
|
restore_saved_module("ssl")
|
||||||
|
restore_saved_module("subprocess")
|
||||||
|
|
||||||
|
|
||||||
|
def restore_saved_module(module):
|
||||||
|
"""
|
||||||
|
gevent monkey patch keeps a list of all patched modules.
|
||||||
|
This will restore the original ones
|
||||||
|
:param module: to unpatch
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Check the saved attributes in geven monkey patch
|
||||||
|
if not (module in monkey.saved):
|
||||||
|
return
|
||||||
|
_module = __import__(module)
|
||||||
|
|
||||||
|
# If it exist unpatch it
|
||||||
|
for attr in monkey.saved[module]:
|
||||||
|
if hasattr(_module, attr):
|
||||||
|
setattr(_module, attr, monkey.saved[module][attr])
|
||||||
|
|
||||||
@@ -21,20 +21,20 @@ except ImportError:
|
|||||||
|
|
||||||
import errno
|
import errno
|
||||||
import math
|
import math
|
||||||
|
import time
|
||||||
from mock import patch, Mock
|
from mock import patch, Mock
|
||||||
import os
|
import os
|
||||||
from six import BytesIO
|
from six import BytesIO
|
||||||
import socket
|
import socket
|
||||||
from socket import error as socket_error
|
from socket import error as socket_error
|
||||||
|
|
||||||
from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT,
|
from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT,
|
||||||
ConnectionException, ProtocolError)
|
ConnectionException, ProtocolError,Timer)
|
||||||
from cassandra.io.asyncorereactor import AsyncoreConnection
|
from cassandra.io.asyncorereactor import AsyncoreConnection
|
||||||
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
|
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
|
||||||
SupportedMessage, ReadyMessage, ServerError)
|
SupportedMessage, ReadyMessage, ServerError)
|
||||||
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
|
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
|
||||||
|
|
||||||
from tests import is_monkey_patched
|
from tests import is_monkey_patched
|
||||||
|
from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback
|
||||||
|
|
||||||
|
|
||||||
class AsyncoreConnectionTest(unittest.TestCase):
|
class AsyncoreConnectionTest(unittest.TestCase):
|
||||||
@@ -300,3 +300,37 @@ class AsyncoreConnectionTest(unittest.TestCase):
|
|||||||
|
|
||||||
self.assertTrue(c.connected_event.is_set())
|
self.assertTrue(c.connected_event.is_set())
|
||||||
self.assertFalse(c.is_defunct)
|
self.assertFalse(c.is_defunct)
|
||||||
|
|
||||||
|
def test_multi_timer_validation(self, *args):
|
||||||
|
"""
|
||||||
|
Verify that timer timeouts are honored appropriately
|
||||||
|
"""
|
||||||
|
c = self.make_connection()
|
||||||
|
# Tests timers submitted in order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, AsyncoreConnection, 0, 100, 1, 100)
|
||||||
|
# Tests timers submitted in reverse order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, AsyncoreConnection, 100, 0, -1, 100)
|
||||||
|
# Tests timers submitted in varying order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, AsyncoreConnection, 0, 100, 1, 100, True)
|
||||||
|
|
||||||
|
def test_timer_cancellation(self):
|
||||||
|
"""
|
||||||
|
Verify that timer cancellation is honored
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Various lists for tracking callback stage
|
||||||
|
connection = self.make_connection()
|
||||||
|
timeout = .1
|
||||||
|
callback = TimerCallback(timeout)
|
||||||
|
timer = connection.create_timer(timeout, callback.invoke)
|
||||||
|
timer.cancel()
|
||||||
|
# Release context allow for timer thread to run.
|
||||||
|
time.sleep(.2)
|
||||||
|
timer_manager = connection._loop._timers
|
||||||
|
# Assert that the cancellation was honored
|
||||||
|
self.assertFalse(timer_manager._queue)
|
||||||
|
self.assertFalse(timer_manager._new_timers)
|
||||||
|
self.assertFalse(callback.was_invoked())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
67
tests/unit/io/test_eventletreactor.py
Normal file
67
tests/unit/io/test_eventletreactor.py
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
# Copyright 2013-2015 DataStax, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
import unittest2 as unittest
|
||||||
|
except ImportError:
|
||||||
|
import unittest # noqa
|
||||||
|
|
||||||
|
from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback
|
||||||
|
from tests import is_eventlet_monkey_patched
|
||||||
|
import time
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cassandra.io.eventletreactor import EventletConnection
|
||||||
|
except ImportError:
|
||||||
|
EventletConnection = None # noqa
|
||||||
|
|
||||||
|
|
||||||
|
class EventletTimerTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
if EventletConnection is None:
|
||||||
|
raise unittest.SkipTest("Eventlet libraries not available")
|
||||||
|
if not is_eventlet_monkey_patched():
|
||||||
|
raise unittest.SkipTest("Can't test eventlet without monkey patching")
|
||||||
|
EventletConnection.initialize_reactor()
|
||||||
|
|
||||||
|
def test_multi_timer_validation(self, *args):
|
||||||
|
"""
|
||||||
|
Verify that timer timeouts are honored appropriately
|
||||||
|
"""
|
||||||
|
# Tests timers submitted in order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, EventletConnection, 0, 100, 1, 100)
|
||||||
|
# Tests timers submitted in reverse order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, EventletConnection, 100, 0, -1, 100)
|
||||||
|
# Tests timers submitted in varying order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, EventletConnection, 0, 100, 1, 100, True)
|
||||||
|
|
||||||
|
def test_timer_cancellation(self):
|
||||||
|
"""
|
||||||
|
Verify that timer cancellation is honored
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Various lists for tracking callback stage
|
||||||
|
timeout = .1
|
||||||
|
callback = TimerCallback(timeout)
|
||||||
|
timer = EventletConnection.create_timer(timeout, callback.invoke)
|
||||||
|
timer.cancel()
|
||||||
|
# Release context allow for timer thread to run.
|
||||||
|
time.sleep(.2)
|
||||||
|
timer_manager = EventletConnection._timers
|
||||||
|
# Assert that the cancellation was honored
|
||||||
|
self.assertFalse(timer_manager._queue)
|
||||||
|
self.assertFalse(timer_manager._new_timers)
|
||||||
|
self.assertFalse(callback.was_invoked())
|
||||||
78
tests/unit/io/test_geventreactor.py
Normal file
78
tests/unit/io/test_geventreactor.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
# Copyright 2013-2015 DataStax, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
try:
|
||||||
|
import unittest2 as unittest
|
||||||
|
except ImportError:
|
||||||
|
import unittest # noqa
|
||||||
|
|
||||||
|
import time
|
||||||
|
from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback
|
||||||
|
from tests import is_gevent_monkey_patched
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cassandra.io.geventreactor import GeventConnection
|
||||||
|
import gevent.monkey
|
||||||
|
from gevent_utils import gevent_un_patch_all
|
||||||
|
except ImportError:
|
||||||
|
GeventConnection = None # noqa
|
||||||
|
|
||||||
|
|
||||||
|
class GeventTimerTest(unittest.TestCase):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
if GeventConnection is not None:
|
||||||
|
if not is_gevent_monkey_patched():
|
||||||
|
gevent.monkey.patch_all()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
if is_gevent_monkey_patched():
|
||||||
|
gevent_un_patch_all()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
if not is_gevent_monkey_patched():
|
||||||
|
raise unittest.SkipTest("Can't test gevent without monkey patching")
|
||||||
|
GeventConnection.initialize_reactor()
|
||||||
|
|
||||||
|
def test_multi_timer_validation(self, *args):
|
||||||
|
"""
|
||||||
|
Verify that timer timeouts are honored appropriately
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Tests timers submitted in order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, GeventConnection, 0, 100, 1, 100)
|
||||||
|
# Tests timers submitted in reverse order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, GeventConnection, 100, 0, -1, 100)
|
||||||
|
# Tests timers submitted in varying order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, GeventConnection, 0, 100, 1, 100, True),
|
||||||
|
|
||||||
|
def test_timer_cancellation(self):
|
||||||
|
"""
|
||||||
|
Verify that timer cancellation is honored
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Various lists for tracking callback stage
|
||||||
|
timeout = .1
|
||||||
|
callback = TimerCallback(timeout)
|
||||||
|
timer = GeventConnection.create_timer(timeout, callback.invoke)
|
||||||
|
timer.cancel()
|
||||||
|
# Release context allow for timer thread to run.
|
||||||
|
time.sleep(.2)
|
||||||
|
timer_manager = GeventConnection._timers
|
||||||
|
# Assert that the cancellation was honored
|
||||||
|
self.assertFalse(timer_manager._queue)
|
||||||
|
self.assertFalse(timer_manager._new_timers)
|
||||||
|
self.assertFalse(callback.was_invoked())
|
||||||
@@ -24,6 +24,7 @@ import six
|
|||||||
from six import BytesIO
|
from six import BytesIO
|
||||||
from socket import error as socket_error
|
from socket import error as socket_error
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT,
|
from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT,
|
||||||
ConnectionException, ProtocolError)
|
ConnectionException, ProtocolError)
|
||||||
@@ -31,6 +32,10 @@ from cassandra.connection import (HEADER_DIRECTION_TO_CLIENT,
|
|||||||
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
|
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
|
||||||
SupportedMessage, ReadyMessage, ServerError)
|
SupportedMessage, ReadyMessage, ServerError)
|
||||||
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
|
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
|
||||||
|
from tests.unit.io.utils import TimerCallback
|
||||||
|
from tests.unit.io.utils import submit_and_wait_for_completion
|
||||||
|
from tests import is_monkey_patched
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from cassandra.io.libevreactor import LibevConnection
|
from cassandra.io.libevreactor import LibevConnection
|
||||||
@@ -46,7 +51,7 @@ except ImportError:
|
|||||||
class LibevConnectionTest(unittest.TestCase):
|
class LibevConnectionTest(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
if 'gevent.monkey' in sys.modules:
|
if is_monkey_patched():
|
||||||
raise unittest.SkipTest("Can't test libev with monkey patching")
|
raise unittest.SkipTest("Can't test libev with monkey patching")
|
||||||
if LibevConnection is None:
|
if LibevConnection is None:
|
||||||
raise unittest.SkipTest('libev does not appear to be installed correctly')
|
raise unittest.SkipTest('libev does not appear to be installed correctly')
|
||||||
@@ -290,4 +295,4 @@ class LibevConnectionTest(unittest.TestCase):
|
|||||||
c.handle_read(None, 0)
|
c.handle_read(None, 0)
|
||||||
|
|
||||||
self.assertTrue(c.connected_event.is_set())
|
self.assertTrue(c.connected_event.is_set())
|
||||||
self.assertFalse(c.is_defunct)
|
self.assertFalse(c.is_defunct)
|
||||||
82
tests/unit/io/test_libevtimer.py
Normal file
82
tests/unit/io/test_libevtimer.py
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
# Copyright 2013-2015 DataStax, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
try:
|
||||||
|
import unittest2 as unittest
|
||||||
|
except ImportError:
|
||||||
|
import unittest # noqa
|
||||||
|
|
||||||
|
|
||||||
|
from mock import patch, Mock
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback
|
||||||
|
from tests import is_monkey_patched
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cassandra.io.libevreactor import LibevConnection
|
||||||
|
except ImportError:
|
||||||
|
LibevConnection = None # noqa
|
||||||
|
|
||||||
|
|
||||||
|
@patch('socket.socket')
|
||||||
|
@patch('cassandra.io.libevwrapper.IO')
|
||||||
|
class LibevTimerTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
if is_monkey_patched():
|
||||||
|
raise unittest.SkipTest("Can't test libev with monkey patching")
|
||||||
|
if LibevConnection is None:
|
||||||
|
raise unittest.SkipTest('libev does not appear to be installed correctly')
|
||||||
|
LibevConnection.initialize_reactor()
|
||||||
|
|
||||||
|
def make_connection(self):
|
||||||
|
c = LibevConnection('1.2.3.4', cql_version='3.0.1')
|
||||||
|
c._socket = Mock()
|
||||||
|
c._socket.send.side_effect = lambda x: len(x)
|
||||||
|
return c
|
||||||
|
|
||||||
|
def test_multi_timer_validation(self, *args):
|
||||||
|
"""
|
||||||
|
Verify that timer timeouts are honored appropriately
|
||||||
|
"""
|
||||||
|
c = self.make_connection()
|
||||||
|
c.initialize_reactor()
|
||||||
|
# Tests timers submitted in order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, c, 0, 100, 1, 100)
|
||||||
|
# Tests timers submitted in reverse order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, c, 100, 0, -1, 100)
|
||||||
|
# Tests timers submitted in varying order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, c, 0, 100, 1, 100, True)
|
||||||
|
|
||||||
|
def test_timer_cancellation(self, *args):
|
||||||
|
"""
|
||||||
|
Verify that timer cancellation is honored
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Various lists for tracking callback stage
|
||||||
|
connection = self.make_connection()
|
||||||
|
timeout = .1
|
||||||
|
callback = TimerCallback(timeout)
|
||||||
|
timer = connection.create_timer(timeout, callback.invoke)
|
||||||
|
timer.cancel()
|
||||||
|
# Release context allow for timer thread to run.
|
||||||
|
time.sleep(.2)
|
||||||
|
timer_manager = connection._libevloop._timers
|
||||||
|
# Assert that the cancellation was honored
|
||||||
|
self.assertFalse(timer_manager._queue)
|
||||||
|
self.assertFalse(timer_manager._new_timers)
|
||||||
|
self.assertFalse(callback.was_invoked())
|
||||||
|
|
||||||
@@ -17,6 +17,7 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
import unittest
|
import unittest
|
||||||
from mock import Mock, patch
|
from mock import Mock, patch
|
||||||
|
import time
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from twisted.test import proto_helpers
|
from twisted.test import proto_helpers
|
||||||
@@ -26,6 +27,54 @@ except ImportError:
|
|||||||
twistedreactor = None # NOQA
|
twistedreactor = None # NOQA
|
||||||
|
|
||||||
from cassandra.connection import _Frame
|
from cassandra.connection import _Frame
|
||||||
|
from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback
|
||||||
|
|
||||||
|
|
||||||
|
class TestTwistedTimer(unittest.TestCase):
|
||||||
|
"""
|
||||||
|
Simple test class that is used to validate that the TimerManager, and timer
|
||||||
|
classes function appropriately with the twisted infrastructure
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
if twistedreactor is None:
|
||||||
|
raise unittest.SkipTest("Twisted libraries not available")
|
||||||
|
twistedreactor.TwistedConnection.initialize_reactor()
|
||||||
|
|
||||||
|
def test_multi_timer_validation(self):
|
||||||
|
"""
|
||||||
|
Verify that the timers are called in the correct order
|
||||||
|
"""
|
||||||
|
twistedreactor.TwistedConnection.initialize_reactor()
|
||||||
|
connection = twistedreactor.TwistedConnection('1.2.3.4',
|
||||||
|
cql_version='3.0.1')
|
||||||
|
# Tests timers submitted in order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, connection, 0, 100, 1, 100)
|
||||||
|
# Tests timers submitted in reverse order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, connection, 100, 0, -1, 100)
|
||||||
|
# Tests timers submitted in varying order at various timeouts
|
||||||
|
submit_and_wait_for_completion(self, connection, 0, 100, 1, 100, True)
|
||||||
|
|
||||||
|
def test_timer_cancellation(self, *args):
|
||||||
|
"""
|
||||||
|
Verify that timer cancellation is honored
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Various lists for tracking callback stage
|
||||||
|
connection = twistedreactor.TwistedConnection('1.2.3.4',
|
||||||
|
cql_version='3.0.1')
|
||||||
|
timeout = .1
|
||||||
|
callback = TimerCallback(timeout)
|
||||||
|
timer = connection.create_timer(timeout, callback.invoke)
|
||||||
|
timer.cancel()
|
||||||
|
# Release context allow for timer thread to run.
|
||||||
|
time.sleep(.2)
|
||||||
|
timer_manager = connection._loop._timers
|
||||||
|
# Assert that the cancellation was honored
|
||||||
|
self.assertFalse(timer_manager._queue)
|
||||||
|
self.assertFalse(timer_manager._new_timers)
|
||||||
|
self.assertFalse(callback.was_invoked())
|
||||||
|
|
||||||
|
|
||||||
class TestTwistedProtocol(unittest.TestCase):
|
class TestTwistedProtocol(unittest.TestCase):
|
||||||
|
|
||||||
@@ -96,8 +145,6 @@ class TestTwistedConnection(unittest.TestCase):
|
|||||||
twistedreactor.TwistedConnection.initialize_reactor()
|
twistedreactor.TwistedConnection.initialize_reactor()
|
||||||
self.reactor_cft_patcher = patch(
|
self.reactor_cft_patcher = patch(
|
||||||
'twisted.internet.reactor.callFromThread')
|
'twisted.internet.reactor.callFromThread')
|
||||||
self.reactor_running_patcher = patch(
|
|
||||||
'twisted.internet.reactor.running', False)
|
|
||||||
self.reactor_run_patcher = patch('twisted.internet.reactor.run')
|
self.reactor_run_patcher = patch('twisted.internet.reactor.run')
|
||||||
self.mock_reactor_cft = self.reactor_cft_patcher.start()
|
self.mock_reactor_cft = self.reactor_cft_patcher.start()
|
||||||
self.mock_reactor_run = self.reactor_run_patcher.start()
|
self.mock_reactor_run = self.reactor_run_patcher.start()
|
||||||
@@ -107,7 +154,6 @@ class TestTwistedConnection(unittest.TestCase):
|
|||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.reactor_cft_patcher.stop()
|
self.reactor_cft_patcher.stop()
|
||||||
self.reactor_run_patcher.stop()
|
self.reactor_run_patcher.stop()
|
||||||
self.obj_ut._loop._cleanup()
|
|
||||||
|
|
||||||
def test_connection_initialization(self):
|
def test_connection_initialization(self):
|
||||||
"""
|
"""
|
||||||
@@ -196,3 +242,4 @@ class TestTwistedConnection(unittest.TestCase):
|
|||||||
self.obj_ut.push('123 pickup')
|
self.obj_ut.push('123 pickup')
|
||||||
self.mock_reactor_cft.assert_called_with(
|
self.mock_reactor_cft.assert_called_with(
|
||||||
self.obj_ut.connector.transport.write, '123 pickup')
|
self.obj_ut.connector.transport.write, '123 pickup')
|
||||||
|
|
||||||
|
|||||||
107
tests/unit/io/utils.py
Normal file
107
tests/unit/io/utils.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
# Copyright 2013-2015 DataStax, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class TimerCallback():
|
||||||
|
|
||||||
|
invoked = False
|
||||||
|
created_time = 0
|
||||||
|
invoked_time = 0
|
||||||
|
expected_wait = 0
|
||||||
|
|
||||||
|
def __init__(self, expected_wait):
|
||||||
|
self.invoked = False
|
||||||
|
self.created_time = time.time()
|
||||||
|
self.expected_wait = expected_wait
|
||||||
|
|
||||||
|
def invoke(self):
|
||||||
|
self.invoked_time = time.time()
|
||||||
|
self.invoked = True
|
||||||
|
|
||||||
|
def was_invoked(self):
|
||||||
|
return self.invoked
|
||||||
|
|
||||||
|
def get_wait_time(self):
|
||||||
|
elapsed_time = self.invoked_time - self.created_time
|
||||||
|
return elapsed_time
|
||||||
|
|
||||||
|
def wait_match_excepted(self):
|
||||||
|
if self.expected_wait-.01 <= self.get_wait_time() <= self.expected_wait+.01:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_timeout(gross_time, start, end, precision, split_range):
|
||||||
|
"""
|
||||||
|
A way to generate varying timeouts based on ranges
|
||||||
|
:param gross_time: Some integer between start and end
|
||||||
|
:param start: the start value of the range
|
||||||
|
:param end: the end value of the range
|
||||||
|
:param precision: the precision to use to generate the timeout.
|
||||||
|
:param split_range: generate values from both ends
|
||||||
|
:return: a timeout value to use
|
||||||
|
"""
|
||||||
|
if(split_range):
|
||||||
|
top_num = float(end)/precision
|
||||||
|
bottom_num = float(start)/precision
|
||||||
|
if gross_time % 2 == 0:
|
||||||
|
timeout = top_num - float(gross_time)/precision
|
||||||
|
else:
|
||||||
|
timeout = bottom_num + float(gross_time)/precision
|
||||||
|
|
||||||
|
else:
|
||||||
|
timeout = float(gross_time)/precision
|
||||||
|
|
||||||
|
return timeout
|
||||||
|
|
||||||
|
|
||||||
|
def submit_and_wait_for_completion(unit_test, connection, start, end, increment, precision, split_range=False):
|
||||||
|
"""
|
||||||
|
This will submit a number of timers to the provided connection. It will then ensure that the corresponding
|
||||||
|
callback is invoked in the appropriate amount of time.
|
||||||
|
:param unit_test: Invoking unit tests
|
||||||
|
:param connection: Connection to create the timer on.
|
||||||
|
:param start: Lower bound of range.
|
||||||
|
:param end: Upper bound of the time range
|
||||||
|
:param increment: +1, or -1
|
||||||
|
:param precision: 100 for centisecond, 1000 for milliseconds
|
||||||
|
:param split_range: True to split the range between incrementing and decrementing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Various lists for tracking callback as completed or pending
|
||||||
|
pending_callbacks = []
|
||||||
|
completed_callbacks = []
|
||||||
|
|
||||||
|
# submit timers with various timeouts
|
||||||
|
for gross_time in range(start, end, increment):
|
||||||
|
timeout = get_timeout(gross_time, start, end, precision, split_range)
|
||||||
|
callback = TimerCallback(timeout)
|
||||||
|
connection.create_timer(timeout, callback.invoke)
|
||||||
|
pending_callbacks.append(callback)
|
||||||
|
|
||||||
|
# wait for all the callbacks associated with the timers to be invoked
|
||||||
|
while len(pending_callbacks) is not 0:
|
||||||
|
for callback in pending_callbacks:
|
||||||
|
if callback.was_invoked():
|
||||||
|
pending_callbacks.remove(callback)
|
||||||
|
completed_callbacks.append(callback)
|
||||||
|
time.sleep(.1)
|
||||||
|
|
||||||
|
# ensure they are all called back in a timely fashion
|
||||||
|
for callback in completed_callbacks:
|
||||||
|
unit_test.assertAlmostEqual(callback.expected_wait, callback.get_wait_time(), delta=.1)
|
||||||
|
|
||||||
|
|
||||||
13
tox.ini
13
tox.ini
@@ -7,10 +7,12 @@ deps = nose
|
|||||||
PyYAML
|
PyYAML
|
||||||
six
|
six
|
||||||
|
|
||||||
|
|
||||||
[testenv]
|
[testenv]
|
||||||
deps = {[base]deps}
|
deps = {[base]deps}
|
||||||
sure==1.2.3
|
sure
|
||||||
blist
|
blist
|
||||||
|
|
||||||
setenv = USE_CASS_EXTERNAL=1
|
setenv = USE_CASS_EXTERNAL=1
|
||||||
commands = {envpython} setup.py build_ext --inplace
|
commands = {envpython} setup.py build_ext --inplace
|
||||||
nosetests --verbosity=2 tests/unit/
|
nosetests --verbosity=2 tests/unit/
|
||||||
@@ -19,8 +21,17 @@ commands = {envpython} setup.py build_ext --inplace
|
|||||||
[testenv:py26]
|
[testenv:py26]
|
||||||
deps = {[testenv]deps}
|
deps = {[testenv]deps}
|
||||||
unittest2
|
unittest2
|
||||||
|
twisted
|
||||||
|
eventlet
|
||||||
|
gevent
|
||||||
# test skipping is different in unittest2 for python 2.7+; let's just use it where needed
|
# test skipping is different in unittest2 for python 2.7+; let's just use it where needed
|
||||||
|
|
||||||
|
[testenv:py27]
|
||||||
|
deps = {[testenv]deps}
|
||||||
|
twisted
|
||||||
|
eventlet
|
||||||
|
gevent
|
||||||
|
|
||||||
[testenv:pypy]
|
[testenv:pypy]
|
||||||
deps = {[base]deps}
|
deps = {[base]deps}
|
||||||
commands = {envpython} setup.py build_ext --inplace
|
commands = {envpython} setup.py build_ext --inplace
|
||||||
|
|||||||
Reference in New Issue
Block a user