diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d3989d7a..bc5f0814 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -530,6 +530,7 @@ class Cluster(object): raise Exception("Cluster is already shut down") if not self._is_setup: + self.connection_class.initialize_reactor() atexit.register(partial(_shutdown_cluster, self)) for address in self.contact_points: host = self.add_host(address, signal=False) diff --git a/cassandra/connection.py b/cassandra/connection.py index a06adc2e..11a17f29 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -179,6 +179,14 @@ class Connection(object): self.lock = RLock() + @classmethod + def initialize_reactor(self): + """ + Called once by Cluster.connect(). This should be used by implementations + to set up any resources that will be shared across connections. + """ + pass + def close(self): raise NotImplementedError() diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 9b7240dc..8c021a88 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -135,12 +135,17 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): module in the Python standard library for its event loop. """ - _loop = AsyncoreLoop() + _loop = None _total_reqd_bytes = 0 _writable = False _readable = False + @classmethod + def initialize_reactor(cls): + if not cls._loop: + cls._loop = AsyncoreLoop() + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 5173fc80..b1ed25b1 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -200,14 +200,18 @@ class LibevConnection(Connection): """ An implementation of :class:`.Connection` that uses libev for its event loop. """ - _libevloop = LibevLoop() - + _libevloop = None _write_watcher_is_active = False _total_reqd_bytes = 0 _read_watcher = None _write_watcher = None _socket = None + @classmethod + def initialize_reactor(cls): + if not cls._libevloop: + cls._libevloop = LibevLoop() + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index 79b0fde1..4d4fb8f3 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -39,6 +39,9 @@ class ConnectionTest(object): klass = None + def setUp(self): + self.klass.initialize_reactor() + def get_connection(self): """ Helper method to solve automated testing issues within Jenkins. @@ -216,6 +219,7 @@ class AsyncoreConnectionTest(ConnectionTest, unittest.TestCase): def setUp(self): if 'gevent.monkey' in sys.modules: raise unittest.SkipTest("Can't test libev with gevent monkey patching") + ConnectionTest.setUp(self) class LibevConnectionTest(ConnectionTest, unittest.TestCase): @@ -228,3 +232,4 @@ class LibevConnectionTest(ConnectionTest, unittest.TestCase): if LibevConnection is None: raise unittest.SkipTest( 'libev does not appear to be installed properly') + ConnectionTest.setUp(self) diff --git a/tests/unit/io/test_asyncorereactor.py b/tests/unit/io/test_asyncorereactor.py index 179fe637..bab800fa 100644 --- a/tests/unit/io/test_asyncorereactor.py +++ b/tests/unit/io/test_asyncorereactor.py @@ -42,6 +42,7 @@ class AsyncoreConnectionTest(unittest.TestCase): @classmethod def setUpClass(cls): + AsyncoreConnection.initialize_reactor() cls.socket_patcher = patch('socket.socket', spec=socket.socket) cls.mock_socket = cls.socket_patcher.start() cls.mock_socket().connect_ex.return_value = 0 diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index ce3858df..3b677ab5 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -50,6 +50,7 @@ class LibevConnectionTest(unittest.TestCase): def setUp(self): if LibevConnection is None: raise unittest.SkipTest('libev does not appear to be installed correctly') + LibevConnection.initialize_reactor() def make_connection(self): c = LibevConnection('1.2.3.4', cql_version='3.0.1')