Delay initialization of shared connection state
This makes it much simpler to avoid sharing connection state across multiple processes. Fixes PYTHON-60
This commit is contained in:
@@ -530,6 +530,7 @@ class Cluster(object):
|
||||
raise Exception("Cluster is already shut down")
|
||||
|
||||
if not self._is_setup:
|
||||
self.connection_class.initialize_reactor()
|
||||
atexit.register(partial(_shutdown_cluster, self))
|
||||
for address in self.contact_points:
|
||||
host = self.add_host(address, signal=False)
|
||||
|
||||
@@ -179,6 +179,14 @@ class Connection(object):
|
||||
|
||||
self.lock = RLock()
|
||||
|
||||
@classmethod
|
||||
def initialize_reactor(self):
|
||||
"""
|
||||
Called once by Cluster.connect(). This should be used by implementations
|
||||
to set up any resources that will be shared across connections.
|
||||
"""
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@@ -135,12 +135,17 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
|
||||
module in the Python standard library for its event loop.
|
||||
"""
|
||||
|
||||
_loop = AsyncoreLoop()
|
||||
_loop = None
|
||||
|
||||
_total_reqd_bytes = 0
|
||||
_writable = False
|
||||
_readable = False
|
||||
|
||||
@classmethod
|
||||
def initialize_reactor(cls):
|
||||
if not cls._loop:
|
||||
cls._loop = AsyncoreLoop()
|
||||
|
||||
@classmethod
|
||||
def factory(cls, *args, **kwargs):
|
||||
timeout = kwargs.pop('timeout', 5.0)
|
||||
|
||||
@@ -200,14 +200,18 @@ class LibevConnection(Connection):
|
||||
"""
|
||||
An implementation of :class:`.Connection` that uses libev for its event loop.
|
||||
"""
|
||||
_libevloop = LibevLoop()
|
||||
|
||||
_libevloop = None
|
||||
_write_watcher_is_active = False
|
||||
_total_reqd_bytes = 0
|
||||
_read_watcher = None
|
||||
_write_watcher = None
|
||||
_socket = None
|
||||
|
||||
@classmethod
|
||||
def initialize_reactor(cls):
|
||||
if not cls._libevloop:
|
||||
cls._libevloop = LibevLoop()
|
||||
|
||||
@classmethod
|
||||
def factory(cls, *args, **kwargs):
|
||||
timeout = kwargs.pop('timeout', 5.0)
|
||||
|
||||
@@ -39,6 +39,9 @@ class ConnectionTest(object):
|
||||
|
||||
klass = None
|
||||
|
||||
def setUp(self):
|
||||
self.klass.initialize_reactor()
|
||||
|
||||
def get_connection(self):
|
||||
"""
|
||||
Helper method to solve automated testing issues within Jenkins.
|
||||
@@ -216,6 +219,7 @@ class AsyncoreConnectionTest(ConnectionTest, unittest.TestCase):
|
||||
def setUp(self):
|
||||
if 'gevent.monkey' in sys.modules:
|
||||
raise unittest.SkipTest("Can't test libev with gevent monkey patching")
|
||||
ConnectionTest.setUp(self)
|
||||
|
||||
|
||||
class LibevConnectionTest(ConnectionTest, unittest.TestCase):
|
||||
@@ -228,3 +232,4 @@ class LibevConnectionTest(ConnectionTest, unittest.TestCase):
|
||||
if LibevConnection is None:
|
||||
raise unittest.SkipTest(
|
||||
'libev does not appear to be installed properly')
|
||||
ConnectionTest.setUp(self)
|
||||
|
||||
@@ -42,6 +42,7 @@ class AsyncoreConnectionTest(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
AsyncoreConnection.initialize_reactor()
|
||||
cls.socket_patcher = patch('socket.socket', spec=socket.socket)
|
||||
cls.mock_socket = cls.socket_patcher.start()
|
||||
cls.mock_socket().connect_ex.return_value = 0
|
||||
|
||||
@@ -50,6 +50,7 @@ class LibevConnectionTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if LibevConnection is None:
|
||||
raise unittest.SkipTest('libev does not appear to be installed correctly')
|
||||
LibevConnection.initialize_reactor()
|
||||
|
||||
def make_connection(self):
|
||||
c = LibevConnection('1.2.3.4', cql_version='3.0.1')
|
||||
|
||||
Reference in New Issue
Block a user