Skipped by default eventlet and gevent tests in Unit tests
This commit is contained in:
@@ -20,6 +20,7 @@ import logging
|
||||
import sys
|
||||
import socket
|
||||
import platform
|
||||
import os
|
||||
|
||||
log = logging.getLogger()
|
||||
log.setLevel('DEBUG')
|
||||
@@ -57,5 +58,9 @@ def is_eventlet_time_monkey_patched():
|
||||
def is_monkey_patched():
|
||||
return is_gevent_monkey_patched() or is_eventlet_monkey_patched()
|
||||
|
||||
|
||||
MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False))
|
||||
|
||||
notwindows = unittest.skipUnless(not "Windows" in platform.system(), "This test is not adequate for windows")
|
||||
notpypy = unittest.skipUnless(not '__pypy__' in sys.builtin_module_names, "This tests is not suitable for pypy")
|
||||
notmonkeypatch = unittest.skipUnless(MONKEY_PATCH_LOOP, "Skpping this test because monkey patching is required")
|
@@ -19,29 +19,34 @@ except ImportError:
|
||||
import unittest # noqa
|
||||
|
||||
from tests.unit.io.utils import TimerConnectionTests
|
||||
from tests.unit.io.eventlet_utils import restore_saved_module
|
||||
from tests import notpypy
|
||||
from tests.unit.io.eventlet_utils import restore_saved_module, eventlet_un_patch_all
|
||||
from tests import notpypy, MONKEY_PATCH_LOOP
|
||||
import time
|
||||
from eventlet import monkey_patch
|
||||
from eventlet import monkey_patch, kill
|
||||
import threading
|
||||
|
||||
try:
|
||||
from cassandra.io.eventletreactor import EventletConnection
|
||||
except ImportError:
|
||||
EventletConnection = None # noqa
|
||||
|
||||
|
||||
class EventletTimerTest(unittest.TestCase, TimerConnectionTests):
|
||||
# There are some issues with some versions of pypy and eventlet
|
||||
@notpypy
|
||||
def setUp(self):
|
||||
#There are some issues with some versions of pypy and eventlet
|
||||
if not MONKEY_PATCH_LOOP:
|
||||
raise unittest.SkipTest("Skpping because monkey patching may affect the rest of the tests")
|
||||
|
||||
if not EventletConnection:
|
||||
raise unittest.SkipTest("Can't test eventlet without monkey patching")
|
||||
monkey_patch(time=True)
|
||||
|
||||
monkey_patch(thread=False, time=True)
|
||||
EventletConnection.initialize_reactor()
|
||||
|
||||
def tearDown(self):
|
||||
restore_saved_module(time)
|
||||
kill(EventletConnection._timeout_watcher)
|
||||
EventletConnection._timers = None
|
||||
restore_saved_module(time)
|
||||
|
||||
def getClass(self):
|
||||
return EventletConnection
|
||||
|
@@ -18,8 +18,8 @@ except ImportError:
|
||||
import unittest # noqa
|
||||
|
||||
|
||||
from tests import is_gevent_time_monkey_patched, is_eventlet_monkey_patched
|
||||
from tests.unit.io.utils import TimerConnectionTests
|
||||
from tests import MONKEY_PATCH_LOOP
|
||||
|
||||
try:
|
||||
from cassandra.io.geventreactor import GeventConnection
|
||||
@@ -31,6 +31,9 @@ except ImportError:
|
||||
|
||||
class GeventTimerTest(unittest.TestCase, TimerConnectionTests):
|
||||
def setUp(self):
|
||||
if not MONKEY_PATCH_LOOP:
|
||||
raise unittest.SkipTest("Skpping because monkey patching may affect the rest of the tests")
|
||||
|
||||
if not GeventConnection:
|
||||
raise unittest.SkipTest("Can't test gevent without monkey patching")
|
||||
gevent.monkey.patch_time()
|
||||
|
@@ -22,6 +22,7 @@ from mock import patch, Mock
|
||||
import time
|
||||
|
||||
from tests.unit.io.utils import submit_and_wait_for_completion, TimerCallback
|
||||
from tests.unit.io.utils import TimerConnectionTests
|
||||
from tests import is_monkey_patched
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user