From 420c8f9c960f32c786dda0a8c3cbae5e002aae12 Mon Sep 17 00:00:00 2001 From: Colin Stolley Date: Thu, 29 May 2014 22:29:07 -0500 Subject: [PATCH 01/26] Fix mixed statements and declarations, memory leak Cleaned up mixed statements and declarations which offend -Werror=declaration-after-statement (under python 3.4). Also fixed memory leak in Async_dealloc(). --- cassandra/io/libevwrapper.c | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/cassandra/io/libevwrapper.c b/cassandra/io/libevwrapper.c index ced72208..ab38ff16 100644 --- a/cassandra/io/libevwrapper.c +++ b/cassandra/io/libevwrapper.c @@ -133,7 +133,8 @@ IO_init(libevwrapper_IO *self, PyObject *args, PyObject *kwds) { PyObject *socket; PyObject *callback; PyObject *loop; - int io_flags = 0; + int io_flags = 0, fd = -1; + struct ev_io *io = NULL; if (!PyArg_ParseTuple(args, "OiOO", &socket, &io_flags, &loop, &callback)) { return -1; @@ -154,14 +155,14 @@ IO_init(libevwrapper_IO *self, PyObject *args, PyObject *kwds) { self->callback = callback; } - int fd = PyObject_AsFileDescriptor(socket); + fd = PyObject_AsFileDescriptor(socket); if (fd == -1) { PyErr_SetString(PyExc_TypeError, "unable to get file descriptor from socket"); Py_XDECREF(callback); Py_XDECREF(loop); return -1; } - struct ev_io *io = &(self->io); + io = &(self->io); ev_io_init(io, io_callback, fd, io_flags); self->io.data = self; return 0; @@ -246,6 +247,7 @@ typedef struct libevwrapper_Async { static void Async_dealloc(libevwrapper_Async *self) { + Py_XDECREF(self->loop); Py_TYPE(self)->tp_free((PyObject *)self); }; @@ -254,8 +256,9 @@ static void async_callback(EV_P_ ev_async *watcher, int revents) {}; static int Async_init(libevwrapper_Async *self, PyObject *args, PyObject *kwds) { PyObject *loop; - static char *kwlist[] = {"loop", NULL}; + struct ev_async *async = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O", kwlist, &loop)) { PyErr_SetString(PyExc_TypeError, "unable to get file descriptor from socket"); return -1; @@ -267,7 +270,7 @@ Async_init(libevwrapper_Async *self, PyObject *args, PyObject *kwds) { } else { return -1; } - struct ev_async *async = &(self->async); + async = &(self->async); ev_async_init(async, async_callback); return 0; }; @@ -345,11 +348,11 @@ Prepare_dealloc(libevwrapper_Prepare *self) { static void prepare_callback(struct ev_loop *loop, ev_prepare *watcher, int revents) { libevwrapper_Prepare *self = watcher->data; - + PyObject *result = NULL; PyGILState_STATE gstate; - gstate = PyGILState_Ensure(); - PyObject *result = PyObject_CallFunction(self->callback, "O", self); + gstate = PyGILState_Ensure(); + result = PyObject_CallFunction(self->callback, "O", self); if (!result) { PyErr_WriteUnraisable(self->callback); } @@ -362,6 +365,7 @@ static int Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) { PyObject *callback; PyObject *loop; + struct ev_prepare *prepare = NULL; if (!PyArg_ParseTuple(args, "OO", &loop, &callback)) { return -1; @@ -383,7 +387,7 @@ Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) { Py_INCREF(callback); self->callback = callback; } - struct ev_prepare *prepare = &(self->prepare); + prepare = &(self->prepare); ev_prepare_init(prepare, prepare_callback); self->prepare.data = self; return 0; @@ -478,6 +482,8 @@ void initlibevwrapper(void) #endif { + PyObject *module = NULL; + if (PyType_Ready(&libevwrapper_LoopType) < 0) INITERROR; @@ -494,9 +500,9 @@ initlibevwrapper(void) INITERROR; # if PY_MAJOR_VERSION >= 3 - PyObject *module = PyModule_Create(&moduledef); + module = PyModule_Create(&moduledef); # else - PyObject *module = Py_InitModule3("libevwrapper", module_methods, module_doc); + module = Py_InitModule3("libevwrapper", module_methods, module_doc); # endif if (module == NULL) From d69b8e62a5205ca76f4124b1994e54fb426f193c Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 30 May 2014 17:21:01 -0500 Subject: [PATCH 02/26] Avoid registering exit cleanups multiple times --- CHANGELOG.rst | 2 ++ cassandra/io/asyncorereactor.py | 28 +++++++++++++++++++++------- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 311693f9..35be8421 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,8 @@ Bug Fixes * Add six to requirements.txt * Avoid KeyError during schema refresh when a keyspace is dropped and TokenAwarePolicy is not in use +* Avoid registering multiple atexit() cleanup functions will the + asyncore event loop is restarted multiple times 2.0.1 ===== diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 758458e0..9b7240dc 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -20,6 +20,7 @@ import os import socket import sys from threading import Event, Lock, Thread +import weakref from six import BytesIO from six.moves import range @@ -46,6 +47,15 @@ from cassandra.marshal import int32_unpack log = logging.getLogger(__name__) +def _cleanup(loop_weakref): + try: + loop = loop_weakref() + except ReferenceError: + return + + loop._cleanup() + + class AsyncoreLoop(object): def __init__(self): @@ -55,6 +65,8 @@ class AsyncoreLoop(object): self._conns_lock = Lock() self._conns = WeakSet() + self._thread = None + atexit.register(partial(_cleanup, weakref.ref(self))) def maybe_start(self): should_start = False @@ -69,10 +81,9 @@ class AsyncoreLoop(object): self._loop_lock.release() if should_start: - thread = Thread(target=self._run_loop, name="cassandra_driver_event_loop") - thread.daemon = True - thread.start() - atexit.register(partial(self._cleanup, thread)) + self._thread = Thread(target=self._run_loop, name="cassandra_driver_event_loop") + self._thread.daemon = True + self._thread.start() def _run_loop(self): log.debug("Starting asyncore event loop") @@ -95,11 +106,14 @@ class AsyncoreLoop(object): log.debug("Asyncore event loop ended") - def _cleanup(self, thread): + def _cleanup(self): self._shutdown = True + if not self._thread: + return + log.debug("Waiting for event loop thread to join...") - thread.join(timeout=1.0) - if thread.is_alive(): + self._thread.join(timeout=1.0) + if self._thread.is_alive(): log.warning( "Event loop thread could not be joined, so shutdown may not be clean. " "Please call Cluster.shutdown() to avoid this.") From b6d45894332498552cb733d5ba7caf9dd1d02945 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 3 Jun 2014 14:30:36 -0500 Subject: [PATCH 03/26] Move libev loop components into a class --- cassandra/io/libevreactor.py | 268 +++++++++++++++-------------- tests/unit/io/test_libevreactor.py | 2 +- 2 files changed, 144 insertions(+), 126 deletions(-) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 02eff6ff..5173fc80 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -19,6 +19,7 @@ import logging import os import socket from threading import Event, Lock, Thread +import weakref from six import BytesIO @@ -46,76 +47,162 @@ except ImportError: log = logging.getLogger(__name__) -class LibevConnection(Connection): - """ - An implementation of :class:`.Connection` that uses libev for its event loop. - """ - _loop = libev.Loop() - _loop_notifier = libev.Async(_loop) - _loop_notifier.start() +def _cleanup(loop_weakref): + try: + loop = loop_weakref() + except ReferenceError: + return - # prevent _loop_notifier from keeping the loop from returning - _loop.unref() + loop._cleanup() - _loop_started = None - _loop_lock = Lock() - _loop_shutdown = False - @classmethod - def _run_loop(cls): +class LibevLoop(object): + + def __init__(self): + self._loop = libev.Loop() + self._notifier = libev.Async(self._loop) + self._notifier.start() + + # prevent _notifier from keeping the loop from returning + self._loop.unref() + + self._started = False + self._shutdown = False + self._lock = Lock() + + self._thread = None + + # set of all connections; only replaced with a new copy + # while holding _conn_set_lock, never modified in place + self._live_conns = set() + # newly created connections that need their write/read watcher started + self._new_conns = set() + # recently closed connections that need their write/read watcher stopped + self._closed_conns = set() + self._conn_set_lock = Lock() + + self._preparer = libev.Prepare(self._loop, self._loop_will_run) + # prevent _preparer from keeping the loop from returning + self._loop.unref() + self._preparer.start() + + atexit.register(partial(_cleanup, weakref.ref(self))) + + def notify(self): + self._notifier.send() + + def maybe_start(self): + should_start = False + with self._lock: + if not self._started: + log.debug("Starting libev event loop") + self._started = True + should_start = True + + if should_start: + self._thread = Thread(target=self._run_loop, name="event_loop") + self._thread.daemon = True + self._thread.start() + + self._notifier.send() + + def _run_loop(self): while True: - end_condition = cls._loop.start() + end_condition = self._loop.start() # there are still active watchers, no deadlock - with cls._loop_lock: - if not cls._loop_shutdown and (end_condition or cls._live_conns): + with self._lock: + if not self._shutdown and (end_condition or self._live_conns): log.debug("Restarting event loop") continue else: # all Connections have been closed, no active watchers log.debug("All Connections currently closed, event loop ended") - cls._loop_started = False + self._started = False break - @classmethod - def _maybe_start_loop(cls): - should_start = False - with cls._loop_lock: - if not cls._loop_started: - log.debug("Starting libev event loop") - cls._loop_started = True - should_start = True + def _cleanup(self): + self._shutdown = True + if not self._thread: + return - if should_start: - t = Thread(target=cls._run_loop, name="event_loop") - t.daemon = True - t.start() - atexit.register(partial(cls._cleanup, t)) - - return should_start - - @classmethod - def _cleanup(cls, thread): - cls._loop_shutdown = True log.debug("Waiting for event loop thread to join...") - thread.join(timeout=1.0) - if thread.is_alive(): + self._thread.join(timeout=1.0) + if self._thread.is_alive(): log.warning( "Event loop thread could not be joined, so shutdown may not be clean. " "Please call Cluster.shutdown() to avoid this.") log.debug("Event loop thread was joined") - # class-level set of all connections; only replaced with a new copy - # while holding _conn_set_lock, never modified in place - _live_conns = set() - # newly created connections that need their write/read watcher started - _new_conns = set() - # recently closed connections that need their write/read watcher stopped - _closed_conns = set() - _conn_set_lock = Lock() + def connection_created(self, conn): + with self._conn_set_lock: + new_live_conns = self._live_conns.copy() + new_live_conns.add(conn) + self._live_conns = new_live_conns + + new_new_conns = self._new_conns.copy() + new_new_conns.add(conn) + self._new_conns = new_new_conns + + def connection_destroyed(self, conn): + with self._conn_set_lock: + new_live_conns = self._live_conns.copy() + new_live_conns.discard(conn) + self._live_conns = new_live_conns + + new_closed_conns = self._closed_conns.copy() + new_closed_conns.add(conn) + self._closed_conns = new_closed_conns + + self._notifier.send() + + def _loop_will_run(self, prepare): + changed = False + for conn in self._live_conns: + if not conn.deque and conn._write_watcher_is_active: + if conn._write_watcher: + conn._write_watcher.stop() + conn._write_watcher_is_active = False + changed = True + elif conn.deque and not conn._write_watcher_is_active: + conn._write_watcher.start() + conn._write_watcher_is_active = True + changed = True + + if self._new_conns: + with self._conn_set_lock: + to_start = self._new_conns + self._new_conns = set() + + for conn in to_start: + conn._read_watcher.start() + + changed = True + + if self._closed_conns: + with self._conn_set_lock: + to_stop = self._closed_conns + self._closed_conns = set() + + for conn in to_stop: + if conn._write_watcher: + conn._write_watcher.stop() + if conn._read_watcher: + conn._read_watcher.stop() + + changed = True + + if changed: + self._notifier.send() + + +class LibevConnection(Connection): + """ + An implementation of :class:`.Connection` that uses libev for its event loop. + """ + _libevloop = LibevLoop() _write_watcher_is_active = False - _total_reqd_bytes = 0 _read_watcher = None _write_watcher = None @@ -134,68 +221,6 @@ class LibevConnection(Connection): else: return conn - @classmethod - def _connection_created(cls, conn): - with cls._conn_set_lock: - new_live_conns = cls._live_conns.copy() - new_live_conns.add(conn) - cls._live_conns = new_live_conns - - new_new_conns = cls._new_conns.copy() - new_new_conns.add(conn) - cls._new_conns = new_new_conns - - @classmethod - def _connection_destroyed(cls, conn): - with cls._conn_set_lock: - new_live_conns = cls._live_conns.copy() - new_live_conns.discard(conn) - cls._live_conns = new_live_conns - - new_closed_conns = cls._closed_conns.copy() - new_closed_conns.add(conn) - cls._closed_conns = new_closed_conns - - @classmethod - def loop_will_run(cls, prepare): - changed = False - for conn in cls._live_conns: - if not conn.deque and conn._write_watcher_is_active: - if conn._write_watcher: - conn._write_watcher.stop() - conn._write_watcher_is_active = False - changed = True - elif conn.deque and not conn._write_watcher_is_active: - conn._write_watcher.start() - conn._write_watcher_is_active = True - changed = True - - if cls._new_conns: - with cls._conn_set_lock: - to_start = cls._new_conns - cls._new_conns = set() - - for conn in to_start: - conn._read_watcher.start() - - changed = True - - if cls._closed_conns: - with cls._conn_set_lock: - to_stop = cls._closed_conns - cls._closed_conns = set() - - for conn in to_stop: - if conn._write_watcher: - conn._write_watcher.stop() - if conn._read_watcher: - conn._read_watcher.stop() - - changed = True - - if changed: - cls._loop_notifier.send() - def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) @@ -219,17 +244,16 @@ class LibevConnection(Connection): for args in self.sockopts: self._socket.setsockopt(*args) - with self._loop_lock: - self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._loop, self.handle_read) - self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._loop, self.handle_write) + with self._libevloop._lock: + self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._libevloop._loop, self.handle_read) + self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._libevloop._loop, self.handle_write) self._send_options_message() - self.__class__._connection_created(self) + self._libevloop.connection_created(self) # start the global event loop if needed - self._maybe_start_loop() - self._loop_notifier.send() + self._libevloop.maybe_start() def close(self): with self.lock: @@ -238,9 +262,9 @@ class LibevConnection(Connection): self.is_closed = True log.debug("Closing connection (%s) to %s", id(self), self.host) - self.__class__._connection_destroyed(self) - self._loop_notifier.send() + self._libevloop.connection_destroyed(self) self._socket.close() + log.debug("Closed socket to %s", self.host) # don't leave in-progress operations hanging if not self.is_defunct: @@ -349,7 +373,7 @@ class LibevConnection(Connection): with self._deque_lock: self.deque.extend(chunks) - self._loop_notifier.send() + self._libevloop.notify() def register_watcher(self, event_type, callback, register_timeout=None): self._push_watchers[event_type].add(callback) @@ -361,9 +385,3 @@ class LibevConnection(Connection): self._push_watchers[event_type].add(callback) self.wait_for_response( RegisterMessage(event_list=type_callback_dict.keys()), timeout=register_timeout) - - -_preparer = libev.Prepare(LibevConnection._loop, LibevConnection.loop_will_run) -# prevent _preparer from keeping the loop from returning -LibevConnection._loop.unref() -_preparer.start() diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index 5fed53b4..ce3858df 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -44,7 +44,7 @@ except ImportError: @patch('cassandra.io.libevwrapper.IO') @patch('cassandra.io.libevwrapper.Prepare') @patch('cassandra.io.libevwrapper.Async') -@patch('cassandra.io.libevreactor.LibevConnection._maybe_start_loop') +@patch('cassandra.io.libevreactor.LibevLoop.maybe_start') class LibevConnectionTest(unittest.TestCase): def setUp(self): From 251188d572cb2ecb378a1936459bd2bbac771280 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 3 Jun 2014 14:54:07 -0500 Subject: [PATCH 04/26] Delay initialization of shared connection state This makes it much simpler to avoid sharing connection state across multiple processes. Fixes PYTHON-60 --- cassandra/cluster.py | 1 + cassandra/connection.py | 8 ++++++++ cassandra/io/asyncorereactor.py | 7 ++++++- cassandra/io/libevreactor.py | 8 ++++++-- tests/integration/standard/test_connection.py | 5 +++++ tests/unit/io/test_asyncorereactor.py | 1 + tests/unit/io/test_libevreactor.py | 1 + 7 files changed, 28 insertions(+), 3 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d3989d7a..bc5f0814 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -530,6 +530,7 @@ class Cluster(object): raise Exception("Cluster is already shut down") if not self._is_setup: + self.connection_class.initialize_reactor() atexit.register(partial(_shutdown_cluster, self)) for address in self.contact_points: host = self.add_host(address, signal=False) diff --git a/cassandra/connection.py b/cassandra/connection.py index a06adc2e..11a17f29 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -179,6 +179,14 @@ class Connection(object): self.lock = RLock() + @classmethod + def initialize_reactor(self): + """ + Called once by Cluster.connect(). This should be used by implementations + to set up any resources that will be shared across connections. + """ + pass + def close(self): raise NotImplementedError() diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 9b7240dc..8c021a88 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -135,12 +135,17 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): module in the Python standard library for its event loop. """ - _loop = AsyncoreLoop() + _loop = None _total_reqd_bytes = 0 _writable = False _readable = False + @classmethod + def initialize_reactor(cls): + if not cls._loop: + cls._loop = AsyncoreLoop() + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 5173fc80..b1ed25b1 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -200,14 +200,18 @@ class LibevConnection(Connection): """ An implementation of :class:`.Connection` that uses libev for its event loop. """ - _libevloop = LibevLoop() - + _libevloop = None _write_watcher_is_active = False _total_reqd_bytes = 0 _read_watcher = None _write_watcher = None _socket = None + @classmethod + def initialize_reactor(cls): + if not cls._libevloop: + cls._libevloop = LibevLoop() + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index 79b0fde1..4d4fb8f3 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -39,6 +39,9 @@ class ConnectionTest(object): klass = None + def setUp(self): + self.klass.initialize_reactor() + def get_connection(self): """ Helper method to solve automated testing issues within Jenkins. @@ -216,6 +219,7 @@ class AsyncoreConnectionTest(ConnectionTest, unittest.TestCase): def setUp(self): if 'gevent.monkey' in sys.modules: raise unittest.SkipTest("Can't test libev with gevent monkey patching") + ConnectionTest.setUp(self) class LibevConnectionTest(ConnectionTest, unittest.TestCase): @@ -228,3 +232,4 @@ class LibevConnectionTest(ConnectionTest, unittest.TestCase): if LibevConnection is None: raise unittest.SkipTest( 'libev does not appear to be installed properly') + ConnectionTest.setUp(self) diff --git a/tests/unit/io/test_asyncorereactor.py b/tests/unit/io/test_asyncorereactor.py index 179fe637..bab800fa 100644 --- a/tests/unit/io/test_asyncorereactor.py +++ b/tests/unit/io/test_asyncorereactor.py @@ -42,6 +42,7 @@ class AsyncoreConnectionTest(unittest.TestCase): @classmethod def setUpClass(cls): + AsyncoreConnection.initialize_reactor() cls.socket_patcher = patch('socket.socket', spec=socket.socket) cls.mock_socket = cls.socket_patcher.start() cls.mock_socket().connect_ex.return_value = 0 diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index ce3858df..3b677ab5 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -50,6 +50,7 @@ class LibevConnectionTest(unittest.TestCase): def setUp(self): if LibevConnection is None: raise unittest.SkipTest('libev does not appear to be installed correctly') + LibevConnection.initialize_reactor() def make_connection(self): c = LibevConnection('1.2.3.4', cql_version='3.0.1') From d7cb0bc0a686bb7223e49bfafbe57080ad3649ba Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 3 Jun 2014 14:57:26 -0500 Subject: [PATCH 05/26] Update changelog --- CHANGELOG.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 35be8421..913a4fb6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,8 @@ Bug Fixes and TokenAwarePolicy is not in use * Avoid registering multiple atexit() cleanup functions will the asyncore event loop is restarted multiple times +* Delay initialization of reactors in order to avoid problems + with shared state when using multiprocessing (PYTHON-60) 2.0.1 ===== From a848d3bf81ebcaad4887b0b8c0557bf69c34d8ef Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 3 Jun 2014 15:18:10 -0500 Subject: [PATCH 06/26] Update debian dependencies for 2.0.x Fixes #133 --- CHANGELOG.rst | 2 ++ debian/changelog | 6 ++++++ debian/control | 8 ++++---- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 913a4fb6..121f72ae 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,8 @@ Bug Fixes asyncore event loop is restarted multiple times * Delay initialization of reactors in order to avoid problems with shared state when using multiprocessing (PYTHON-60) +* Add python-six to debian dependencies, move python-blist to + recommends 2.0.1 ===== diff --git a/debian/changelog b/debian/changelog index c98ea8c9..2acd0df4 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +python-cassandra-driver (2.0.2~prerelease-1) unstable; urgency=low + + * Update dependencies for 2.0.x + + -- Tyler Hobbs Tue, 03 Jun 2014 15:16:53 -0500 + python-cassandra-driver (1.1.0~prerelease-1) unstable; urgency=low * Initial packaging diff --git a/debian/control b/debian/control index c13f4e86..6a698a8f 100644 --- a/debian/control +++ b/debian/control @@ -6,16 +6,16 @@ Build-Depends: python-all-dev (>= 2.6.6-3), python-all-dbg, debhelper (>= 9), python-sphinx (>= 1.0.7+dfsg) | python3-sphinx, libev-dev, python-concurrent.futures | python-futures, python-setuptools, python-nose, python-mock, python-yaml, python-gevent, - python-blist, python-tz + python-blist, python-tz, python-six (>= 1.6) X-Python-Version: >= 2.7 Standards-Version: 3.9.4 Package: python-cassandra-driver Architecture: any -Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends}, python-blist, - python-concurrent.futures | python-futures +Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends}, + python-concurrent.futures | python-futures, python-six (>= 1.6) Provides: ${python:Provides} -Recommends: python-scales +Recommends: python-scales, python-blist Suggests: python-cassandra-driver-doc Description: Python driver for Apache Cassandra This driver works exclusively with the Cassandra Query Language v3 (CQL3) From 58b3c717d742bdc63b96cb61d5f2cb830fcfec62 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 3 Jun 2014 15:20:07 -0500 Subject: [PATCH 07/26] Add debian/control step to README-dev release steps Relates to #133 --- README-dev.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README-dev.rst b/README-dev.rst index 9a81395d..8c6a13bb 100644 --- a/README-dev.rst +++ b/README-dev.rst @@ -1,6 +1,8 @@ Releasing ========= * Run the tests and ensure they all pass +* If dependencies have changed, make sure ``debian/control`` + is up to date * Update CHANGELOG.rst * Update the version in ``cassandra/__init__.py`` * Commit the changelog and version changes From a6d3c06a2aa98a5014e52bfff2384e25f56383cb Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Fri, 6 Jun 2014 20:00:12 -0500 Subject: [PATCH 08/26] Adding wait_other_notice=True for ccm --- tests/integration/__init__.py | 4 ++-- tests/integration/standard/test_metrics.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 31240093..320e5a46 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -109,7 +109,7 @@ def setup_package(): cluster.populate(3) log.debug("Starting ccm test cluster") - cluster.start(wait_for_binary_proto=True) + cluster.start(wait_for_binary_proto=True, wait_other_notice=True) except Exception: log.exception("Failed to start ccm cluster:") raise @@ -134,7 +134,7 @@ def use_multidc(dc_list): cluster.populate(dc_list) log.debug("Starting ccm test cluster") - cluster.start(wait_for_binary_proto=True) + cluster.start(wait_for_binary_proto=True, wait_other_notice=True) except Exception: log.exception("Failed to start ccm cluster:") raise diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index aa39843a..9793aa22 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -51,7 +51,7 @@ class MetricsTests(unittest.TestCase): # Ensure the nodes are actually down self.assertRaises(NoHostAvailable, session.execute, "USE test3rf") finally: - get_cluster().start(wait_for_binary_proto=True) + get_cluster().start(wait_for_binary_proto=True, wait_other_notice=True) self.assertGreater(cluster.metrics.stats.connection_errors, 0) From 882fffb463240b655da3e08e38a6d370ac5d28fc Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 10 Jun 2014 11:24:23 -0500 Subject: [PATCH 09/26] Better messages and docs for Read/WriteTimeouts --- cassandra/__init__.py | 10 ++++++++++ cassandra/protocol.py | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 800e74e1..48f21947 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -172,6 +172,11 @@ class Timeout(Exception): class ReadTimeout(Timeout): """ A subclass of :exc:`Timeout` for read operations. + + This indicates that the replicas failed to respond to the coordinator + node before the configured timeout. This timeout is configured in + ``cassandra.yaml`` with the ``read_request_timeout_in_ms`` + and ``range_request_timeout_in_ms`` options. """ data_retrieved = None @@ -189,6 +194,11 @@ class ReadTimeout(Timeout): class WriteTimeout(Timeout): """ A subclass of :exc:`Timeout` for write operations. + + This indicates that the replicas failed to respond to the coordinator + node before the configured timeout. This timeout is configured in + ``cassandra.yaml`` with the ``write_request_timeout_in_ms`` + option. """ write_type = None diff --git a/cassandra/protocol.py b/cassandra/protocol.py index e9cb59f4..18b6d904 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -230,7 +230,7 @@ class TruncateError(RequestExecutionException): class WriteTimeoutErrorMessage(RequestExecutionException): - summary = 'Timeout during write request' + summary = 'Coordinator timeout waiting for replica response' error_code = 0x1100 @staticmethod @@ -247,7 +247,7 @@ class WriteTimeoutErrorMessage(RequestExecutionException): class ReadTimeoutErrorMessage(RequestExecutionException): - summary = 'Timeout during read request' + summary = 'Coordinator timeout waiting for replica response' error_code = 0x1200 @staticmethod From d0e040d7dde8edcb3e1162a9ed3af63ebb26ba78 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 10 Jun 2014 14:45:38 -0500 Subject: [PATCH 10/26] Fix memory leak when destroying libev connections The IO objects held a reference to their LibevConnection instances through the callback. This reference cycle was preventing the connection from being garbage collected. Fixes #93 --- CHANGELOG.rst | 2 ++ cassandra/io/libevreactor.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 121f72ae..396d2b2e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,8 @@ Bug Fixes with shared state when using multiprocessing (PYTHON-60) * Add python-six to debian dependencies, move python-blist to recommends +* Fix memory leak when libev connections are created and + destroyed (github #93) 2.0.1 ===== diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index b1ed25b1..c386e457 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -187,8 +187,12 @@ class LibevLoop(object): for conn in to_stop: if conn._write_watcher: conn._write_watcher.stop() + # clear reference cycles from IO callback + del conn._write_watcher if conn._read_watcher: conn._read_watcher.stop() + # clear reference cycles from IO callback + del conn._read_watcher changed = True From db98dde36cc2bdbe6371f35640ee9bbbd5de4269 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 10 Jun 2014 14:49:16 -0500 Subject: [PATCH 11/26] Minor tweak to Read/WriteTimeout error summaries --- cassandra/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 18b6d904..8b98d20e 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -230,7 +230,7 @@ class TruncateError(RequestExecutionException): class WriteTimeoutErrorMessage(RequestExecutionException): - summary = 'Coordinator timeout waiting for replica response' + summary = "Coordinator node timed out waiting for replica nodes' responses" error_code = 0x1100 @staticmethod @@ -247,7 +247,7 @@ class WriteTimeoutErrorMessage(RequestExecutionException): class ReadTimeoutErrorMessage(RequestExecutionException): - summary = 'Coordinator timeout waiting for replica response' + summary = "Coordinator node timed out waiting for replica nodes' responses" error_code = 0x1200 @staticmethod From 616eb26ae9ab7729eab8d0a05447feb7e793bf08 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 10 Jun 2014 15:57:55 -0500 Subject: [PATCH 12/26] Ensure token map is rebuilt when nodes are removed --- CHANGELOG.rst | 2 ++ cassandra/cluster.py | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 396d2b2e..2cb09b76 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,8 @@ Bug Fixes recommends * Fix memory leak when libev connections are created and destroyed (github #93) +* Ensure token map is rebuilt when hosts are removed from + the cluster 2.0.1 ===== diff --git a/cassandra/cluster.py b/cassandra/cluster.py index bc5f0814..ff7936ea 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1691,17 +1691,18 @@ class ControlConnection(object): else: self._cluster.metadata.rebuild_schema(ks_result, cf_result, col_result) - def refresh_node_list_and_token_map(self): + def refresh_node_list_and_token_map(self, force_token_rebuild=False): try: if self._connection: - self._refresh_node_list_and_token_map(self._connection) + self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild) except ReferenceError: pass # our weak reference to the Cluster is no good except Exception: log.debug("[control connection] Error refreshing node list and token map", exc_info=True) self._signal_error() - def _refresh_node_list_and_token_map(self, connection, preloaded_results=None): + def _refresh_node_list_and_token_map(self, connection, preloaded_results=None, + force_token_rebuild=False): if preloaded_results: log.debug("[control connection] Refreshing node list and token map using preloaded results") peers_result = preloaded_results[0] @@ -1736,7 +1737,7 @@ class ControlConnection(object): if partitioner and tokens: token_map[host] = tokens - should_rebuild_token_map = False + should_rebuild_token_map = force_token_rebuild found_hosts = set() for row in peers_result: addr = row.get("rpc_address") @@ -1762,12 +1763,11 @@ class ControlConnection(object): token_map[host] = tokens for old_host in self._cluster.metadata.all_hosts(): - if old_host.address != connection.host and \ - old_host.address not in found_hosts and \ - old_host.address not in self._cluster.contact_points: - log.debug("[control connection] Found host that has been removed: %r", old_host) + if old_host.address != connection.host and old_host.address not in found_hosts: should_rebuild_token_map = True - self._cluster.remove_host(old_host) + if old_host.address not in self._cluster.contact_points: + log.debug("[control connection] Found host that has been removed: %r", old_host) + self._cluster.remove_host(old_host) log.debug("[control connection] Finished fetching ring info") if partitioner and should_rebuild_token_map: @@ -1946,10 +1946,10 @@ class ControlConnection(object): self.reconnect() def on_add(self, host): - self.refresh_node_list_and_token_map() + self.refresh_node_list_and_token_map(force_token_rebuild=True) def on_remove(self, host): - self.refresh_node_list_and_token_map() + self.refresh_node_list_and_token_map(force_token_rebuild=True) def _stop_scheduler(scheduler, thread): From 88a7b5993f099197555577fdf3bf810624442e99 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 10 Jun 2014 16:22:54 -0500 Subject: [PATCH 13/26] Release 2.0.2 --- CHANGELOG.rst | 1 + cassandra/__init__.py | 2 +- debian/changelog | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2cb09b76..319534ec 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,5 +1,6 @@ 2.0.2 ===== +June 10, 2014 Bug Fixes --------- diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 48f21947..8d7c7031 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -23,7 +23,7 @@ class NullHandler(logging.Handler): logging.getLogger('cassandra').addHandler(NullHandler()) -__version_info__ = (2, 0, 1, 'post') +__version_info__ = (2, 0, 2) __version__ = '.'.join(map(str, __version_info__)) diff --git a/debian/changelog b/debian/changelog index 2acd0df4..0360d906 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +python-cassandra-driver (2.0.2-1) unstable; urgency=low + + * Release 2.0.2 + + -- Tyler Hobbs Tue, 10 Jun 2014 16:22:23 -0500 + python-cassandra-driver (2.0.2~prerelease-1) unstable; urgency=low * Update dependencies for 2.0.x From 04273cde9947de4025e06a74a9d30f9d286b307b Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 10 Jun 2014 16:25:00 -0500 Subject: [PATCH 14/26] Make version 2.0.2.post --- cassandra/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 8d7c7031..1462df37 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -23,7 +23,7 @@ class NullHandler(logging.Handler): logging.getLogger('cassandra').addHandler(NullHandler()) -__version_info__ = (2, 0, 2) +__version_info__ = (2, 0, 2, 'post') __version__ = '.'.join(map(str, __version_info__)) From 0ecc91ff25676a9bd1ff6320467520b8a5b8fd70 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 10 Jun 2014 16:32:05 -0500 Subject: [PATCH 15/26] Fix changelog typo --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 319534ec..fb545629 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,7 +7,7 @@ Bug Fixes * Add six to requirements.txt * Avoid KeyError during schema refresh when a keyspace is dropped and TokenAwarePolicy is not in use -* Avoid registering multiple atexit() cleanup functions will the +* Avoid registering multiple atexit cleanup functions when the asyncore event loop is restarted multiple times * Delay initialization of reactors in order to avoid problems with shared state when using multiprocessing (PYTHON-60) From 5476e8288553131b4698a46aa7cfb3e533503eff Mon Sep 17 00:00:00 2001 From: Dmitry Belaventsev Date: Wed, 11 Jun 2014 11:45:56 +0700 Subject: [PATCH 16/26] dict_factory docstring fix --- cassandra/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/query.py b/cassandra/query.py index 74d565ae..52418a09 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -109,7 +109,7 @@ def dict_factory(colnames, rows): Example:: - >>> from cassandra.query import named_tuple_factory + >>> from cassandra.query import dict_factory >>> session = cluster.connect('mykeyspace') >>> session.row_factory = dict_factory >>> rows = session.execute("SELECT name, age FROM users LIMIT 1") From 321cf779e849f6d8a1ad2ae73ffb85dd4b68828f Mon Sep 17 00:00:00 2001 From: Dmitry Belaventsev Date: Wed, 11 Jun 2014 11:55:19 +0700 Subject: [PATCH 17/26] docstring fix --- cassandra/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/query.py b/cassandra/query.py index 52418a09..dddcd557 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -114,7 +114,7 @@ def dict_factory(colnames, rows): >>> session.row_factory = dict_factory >>> rows = session.execute("SELECT name, age FROM users LIMIT 1") >>> print rows[0] - {'age': 42, 'name': 'Bob'} + {u'age': 42, u'name': u'Bob'} .. versionchanged:: 2.0.0 moved from ``cassandra.decoder`` to ``cassandra.query`` From a199debd2f00c25abdea41faa89509717bf2892c Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Wed, 11 Jun 2014 00:09:14 -0700 Subject: [PATCH 18/26] Document exceptions moving from cassandra.decoder to cassandra.protocol --- CHANGELOG.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index fb545629..2d0c07f5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -114,6 +114,9 @@ now: * cassandra.decoder.dict_factory has moved to cassandra.query.dict_factory * cassandra.decoder.ordered_dict_factory has moved to cassandra.query.ordered_dict_factory +Exceptions that were in cassandra.decoder have been moved to cassandra.protocol. If +you handle any of these exceptions, you must adjust the code accordingly. + 1.1.2 ===== May 8, 2014 From fd3ae26c043bc43d4052f7768dcdadca3c22a84e Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 11 Jun 2014 13:12:55 -0500 Subject: [PATCH 19/26] Use six.moves for references to xrange Fixes #138 --- CHANGELOG.rst | 9 +++++++++ cassandra/io/geventreactor.py | 1 + cassandra/io/libevreactor.py | 1 + 3 files changed, 11 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2d0c07f5..e532a448 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,12 @@ +2.0.3 +===== +In Progress + +Bug Fixes +--------- +* Fix references to xrange that do not go through "six" in + libevreactor and geventreactor (github #138) + 2.0.2 ===== June 10, 2014 diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py index cefe4b8e..27f24328 100644 --- a/cassandra/io/geventreactor.py +++ b/cassandra/io/geventreactor.py @@ -22,6 +22,7 @@ from functools import partial import logging import os +from six.moves import xrange try: from cStringIO import StringIO except ImportError: diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index c386e457..05b6863b 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -22,6 +22,7 @@ from threading import Event, Lock, Thread import weakref from six import BytesIO +from six.moves import xrange from cassandra import OperationTimedOut from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING From fe9d0f2d26afdb23190f190ceef078c508a35899 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 11 Jun 2014 18:57:45 -0500 Subject: [PATCH 20/26] BoundStmt should inherit fetch_size from PreparedStmt Fixes PYTHON-80 --- CHANGELOG.rst | 2 ++ cassandra/query.py | 1 + tests/unit/test_parameter_binding.py | 18 ++++++++++++++++++ 3 files changed, 21 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e532a448..eb774c3a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,8 @@ Bug Fixes --------- * Fix references to xrange that do not go through "six" in libevreactor and geventreactor (github #138) +* Make BoundStatements inherit fetch_size from their parent + PreparedStatement (PYTHON-80) 2.0.2 ===== diff --git a/cassandra/query.py b/cassandra/query.py index dddcd557..c3cc8de6 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -392,6 +392,7 @@ class BoundStatement(Statement): """ self.consistency_level = prepared_statement.consistency_level self.serial_consistency_level = prepared_statement.serial_consistency_level + self.fetch_size = prepared_statement.fetch_size self.prepared_statement = prepared_statement self.values = [] diff --git a/tests/unit/test_parameter_binding.py b/tests/unit/test_parameter_binding.py index b9edbbe3..01934caa 100644 --- a/tests/unit/test_parameter_binding.py +++ b/tests/unit/test_parameter_binding.py @@ -107,3 +107,21 @@ class BoundStatementTestCase(unittest.TestCase): self.assertIn('list', str(e)) else: self.fail('Passed invalid type but exception was not thrown') + + def test_inherit_fetch_size(self): + keyspace = 'keyspace1' + column_family = 'cf1' + + column_metadata = [ + (keyspace, column_family, 'foo1', Int32Type), + (keyspace, column_family, 'foo2', Int32Type) + ] + + prepared_statement = PreparedStatement(column_metadata=column_metadata, + query_id=None, + routing_key_indexes=[], + query=None, + keyspace=keyspace, + fetch_size=1234) + bound_statement = BoundStatement(prepared_statement=prepared_statement) + self.assertEqual(1234, bound_statement.fetch_size) From f060384b6adff5cc4443d6a85508c72d3c012203 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 12 Jun 2014 13:01:08 -0500 Subject: [PATCH 21/26] Use ev_loop_new and ev_loop_destroy Related to #141 --- cassandra/io/libevwrapper.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cassandra/io/libevwrapper.c b/cassandra/io/libevwrapper.c index ab38ff16..cbac83b2 100644 --- a/cassandra/io/libevwrapper.c +++ b/cassandra/io/libevwrapper.c @@ -8,6 +8,7 @@ typedef struct libevwrapper_Loop { static void Loop_dealloc(libevwrapper_Loop *self) { + ev_loop_destroy(self->loop); Py_TYPE(self)->tp_free((PyObject *)self); }; @@ -17,9 +18,9 @@ Loop_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { self = (libevwrapper_Loop *)type->tp_alloc(type, 0); if (self != NULL) { - self->loop = ev_default_loop(EVBACKEND_SELECT); + self->loop = ev_loop_new(EVBACKEND_SELECT); if (!self->loop) { - PyErr_SetString(PyExc_Exception, "Error getting default ev loop"); + PyErr_SetString(PyExc_Exception, "Error getting new ev loop"); Py_DECREF(self); return NULL; } From c277202ac906961b62061736abca5317cdecaf93 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 12 Jun 2014 13:07:33 -0500 Subject: [PATCH 22/26] Cleanup live watchers during libev loop cleanup Relates to #141 --- cassandra/io/libevreactor.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 05b6863b..141ddd57 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -126,6 +126,15 @@ class LibevLoop(object): if not self._thread: return + for conn in self._live_conns | self._new_conns | self._closed_conns: + conn.close() + if conn._write_watcher: + conn._write_watcher.stop() + del conn._write_watcher + if conn._read_watcher: + conn._read_watcher.stop() + del conn._read_watcher + log.debug("Waiting for event loop thread to join...") self._thread.join(timeout=1.0) if self._thread.is_alive(): @@ -134,6 +143,7 @@ class LibevLoop(object): "Please call Cluster.shutdown() to avoid this.") log.debug("Event loop thread was joined") + self._loop = None def connection_created(self, conn): with self._conn_set_lock: From cd9ff7077b1ca91e9661f744cab9d20545c788dd Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 12 Jun 2014 13:08:01 -0500 Subject: [PATCH 23/26] Add methods for cleaning up after forking Relates to #141 --- cassandra/connection.py | 8 ++++++++ cassandra/io/asyncorereactor.py | 6 ++++++ cassandra/io/libevreactor.py | 6 ++++++ 3 files changed, 20 insertions(+) diff --git a/cassandra/connection.py b/cassandra/connection.py index 11a17f29..39c58ed4 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -187,6 +187,14 @@ class Connection(object): """ pass + @classmethod + def handle_fork(self): + """ + Called after a forking. This should cleanup any remaining reactor state + from the parent process. + """ + pass + def close(self): raise NotImplementedError() diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 8c021a88..6d4a0ba4 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -146,6 +146,12 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): if not cls._loop: cls._loop = AsyncoreLoop() + @classmethod + def handle_fork(cls): + if cls._loop: + cls._loop._cleanup() + cls._loop = None + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 141ddd57..fd90e6a9 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -227,6 +227,12 @@ class LibevConnection(Connection): if not cls._libevloop: cls._libevloop = LibevLoop() + @classmethod + def handle_fork(cls): + if cls._libevloop: + cls._libevloop._cleanup() + cls._libevloop = None + @classmethod def factory(cls, *args, **kwargs): timeout = kwargs.pop('timeout', 5.0) From 5b57a85f82f6b5c50ae7e8842db5ebdf246e0c85 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 12 Jun 2014 13:41:22 -0500 Subject: [PATCH 24/26] Detect fork in child proc, reinitialize reactor state --- CHANGELOG.rst | 3 +++ cassandra/io/asyncorereactor.py | 7 +++++++ cassandra/io/libevreactor.py | 6 ++++++ 3 files changed, 16 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index eb774c3a..e835948b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,9 @@ Bug Fixes libevreactor and geventreactor (github #138) * Make BoundStatements inherit fetch_size from their parent PreparedStatement (PYTHON-80) +* Clear reactor state in child process after forking + to prevent errors with multiprocessing when the parent + process has connected a Cluster before forking (github #141) 2.0.2 ===== diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 6d4a0ba4..be73187b 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -59,6 +59,7 @@ def _cleanup(loop_weakref): class AsyncoreLoop(object): def __init__(self): + self._pid = os.getpid() self._loop_lock = Lock() self._started = False self._shutdown = False @@ -145,6 +146,12 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): def initialize_reactor(cls): if not cls._loop: cls._loop = AsyncoreLoop() + else: + current_pid = os.getpid() + if cls._loop._pid != current_pid: + log.debug("Detected fork, clearing and reinitializing reactor state") + cls.handle_fork() + cls._loop = AsyncoreLoop() @classmethod def handle_fork(cls): diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index fd90e6a9..23da912d 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -60,6 +60,7 @@ def _cleanup(loop_weakref): class LibevLoop(object): def __init__(self): + self._pid = os.getpid() self._loop = libev.Loop() self._notifier = libev.Async(self._loop) self._notifier.start() @@ -226,6 +227,11 @@ class LibevConnection(Connection): def initialize_reactor(cls): if not cls._libevloop: cls._libevloop = LibevLoop() + else: + if cls._libevloop._pid != os.getpid(): + log.debug("Detected fork, clearing and reinitializing reactor state") + cls.handle_fork() + cls._libevloop = LibevLoop() @classmethod def handle_fork(cls): From 38306470bf0fc3f4499bd7e1c2d1eba987325647 Mon Sep 17 00:00:00 2001 From: Colin Stolley Date: Sat, 14 Jun 2014 13:09:14 -0500 Subject: [PATCH 25/26] Replace six.BytesIO with io.BytesIO for speed-up On python 2.6 and 2.7, six.BytesIO maps to StringIO, which is slow: https://github.com/kelp404/six/blob/master/six.py#L478 Python 2.6 and 2.7 provide the io module (for compatability with the Python 3 io module), which defines a faster BytesIO implementation. cProfile reveals a significant speedup when using the io module. Using six.BytesIO: ncalls tottime percall cumtime percall filename:lineno(function) ... 2277792 9.736 0.000 13.088 0.000 /usr/lib64/python2.6/StringIO.py:208(write) However with io.BytesIO: 2277792 0.920 0.000 0.920 0.000 {method 'write' of '_bytesio._BytesIO' objects} This will break on Python 2.5, but I think that's ok. --- cassandra/protocol.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 8b98d20e..9b17eb68 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import logging import socket from uuid import UUID import six from six.moves import range +import io from cassandra import (Unavailable, WriteTimeout, ReadTimeout, AlreadyExists, InvalidRequest, Unauthorized, @@ -67,7 +69,7 @@ class _MessageType(object): tracing = False def to_binary(self, stream_id, protocol_version, compression=None): - body = six.BytesIO() + body = io.BytesIO() self.send_body(body, protocol_version) body = body.getvalue() @@ -78,7 +80,7 @@ class _MessageType(object): if self.tracing: flags |= TRACING_FLAG - msg = six.BytesIO() + msg = io.BytesIO() write_header( msg, protocol_version | HEADER_DIRECTION_FROM_CLIENT, @@ -107,7 +109,7 @@ def decode_response(stream_id, flags, opcode, body, decompressor=None): body = decompressor(body) flags ^= COMPRESSED_FLAG - body = six.BytesIO(body) + body = io.BytesIO(body) if flags & TRACING_FLAG: trace_id = UUID(bytes=body.read(16)) flags ^= TRACING_FLAG From 48582f447698307725156e9196369dab083c730a Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 17 Jun 2014 11:02:32 -0500 Subject: [PATCH 26/26] Use BytesIO for reduced CPU consumption Relates to #143 --- CHANGELOG.rst | 4 ++++ cassandra/io/asyncorereactor.py | 7 ++++--- cassandra/io/geventreactor.py | 10 ++++------ cassandra/io/libevreactor.py | 7 ++++--- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e835948b..b4854f68 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,10 @@ ===== In Progress +Features +-------- +* Use io.BytesIO for reduced CPU consumption (github #143) + Bug Fixes --------- * Fix references to xrange that do not go through "six" in diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index be73187b..0cbad004 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import atexit from collections import deque from functools import partial +import io import logging import os import socket @@ -22,7 +24,6 @@ import sys from threading import Event, Lock, Thread import weakref -from six import BytesIO from six.moves import range from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL, EISCONN, errorcode @@ -177,7 +178,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): asyncore.dispatcher.__init__(self) self.connected_event = Event() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._callbacks = {} self.deque = deque() @@ -330,7 +331,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): # leave leftover in current buffer leftover = self._iobuf.read() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._iobuf.write(leftover) self._total_reqd_bytes = 0 diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py index 27f24328..93ebab82 100644 --- a/cassandra/io/geventreactor.py +++ b/cassandra/io/geventreactor.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import gevent from gevent import select, socket from gevent.event import Event @@ -19,14 +20,11 @@ from gevent.queue import Queue from collections import defaultdict from functools import partial +import io import logging import os from six.moves import xrange -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO # ignore flake8 warning: # NOQA from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL @@ -73,7 +71,7 @@ class GeventConnection(Connection): Connection.__init__(self, *args, **kwargs) self.connected_event = Event() - self._iobuf = StringIO() + self._iobuf = io.BytesIO() self._write_queue = Queue() self._callbacks = {} @@ -179,7 +177,7 @@ class GeventConnection(Connection): # leave leftover in current buffer leftover = self._iobuf.read() - self._iobuf = StringIO() + self._iobuf = io.BytesIO() self._iobuf.write(leftover) self._total_reqd_bytes = 0 diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 23da912d..9c2d93d9 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -12,16 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import # to enable import io from stdlib import atexit from collections import deque from functools import partial +import io import logging import os import socket from threading import Event, Lock, Thread import weakref -from six import BytesIO from six.moves import xrange from cassandra import OperationTimedOut @@ -256,7 +257,7 @@ class LibevConnection(Connection): Connection.__init__(self, *args, **kwargs) self.connected_event = Event() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._callbacks = {} self.deque = deque() @@ -381,7 +382,7 @@ class LibevConnection(Connection): # leave leftover in current buffer leftover = self._iobuf.read() - self._iobuf = BytesIO() + self._iobuf = io.BytesIO() self._iobuf.write(leftover) self._total_reqd_bytes = 0