diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 5aa52f9e..ae56f9b7 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -87,6 +87,17 @@ class LibevConnection(Connection): An implementation of :class:`.Connection` that utilizes libev. """ + # class-level set of all connections; only replaced with a new copy + # while holding _conn_set_lock, never modified in place + _live_conns = set() + # newly created connections that need their write/read watcher started + _new_conns = set() + # recently closed connections that need their write/read watcher stopped + _closed_conns = set() + _conn_set_lock = Lock() + + _write_watcher_is_active = False + _total_reqd_bytes = 0 _read_watcher = None _write_watcher = None @@ -105,6 +116,65 @@ class LibevConnection(Connection): else: return conn + @classmethod + def _connection_created(cls, conn): + with cls._conn_set_lock: + new_live_conns = cls._live_conns.copy() + new_live_conns.add(conn) + cls._live_conns = new_live_conns + + new_new_conns = cls._new_conns.copy() + new_new_conns.add(conn) + cls._new_conns = new_new_conns + + @classmethod + def _connection_destroyed(cls, conn): + with cls._conn_set_lock: + new_live_conns = cls._live_conns.copy() + new_live_conns.discard(conn) + cls._live_conns = new_live_conns + + new_closed_conns = cls._closed_conns.copy() + new_closed_conns.add(conn) + cls._closed_conns = new_closed_conns + + @classmethod + def loop_will_run(cls, prepare): + changed = False + for conn in cls._live_conns: + if not conn.deque and conn._write_watcher_is_active: + conn._write_watcher.stop() + conn._write_watcher_is_active = False + changed = True + elif conn.deque and not conn._write_watcher_is_active: + conn._write_watcher.start() + conn._write_watcher_is_active = True + changed = True + + if cls._new_conns: + with cls._conn_set_lock: + to_start = cls._new_conns + cls._new_conns = set() + + for conn in to_start: + conn._read_watcher.start() + + changed = True + + if cls._closed_conns: + with cls._conn_set_lock: + to_stop = cls._closed_conns + cls._closed_conns = set() + + for conn in to_stop: + conn._write_watcher.stop() + conn._read_watcher.stop() + + changed = True + + if changed: + _loop_notifier.send() + def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) @@ -129,19 +199,17 @@ class LibevConnection(Connection): for args in self.sockopts: self._socket.setsockopt(*args) - self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, _loop, self.handle_read) - self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, _loop, self.handle_write) with _loop_lock: - self._read_watcher.start() - self._write_watcher.start() + self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, _loop, self.handle_read) + self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, _loop, self.handle_write) self._send_options_message() + self.__class__._connection_created(self) + # start the global event loop if needed - if not _start_loop(): - # if the loop was already started, notify it - with _loop_lock: - _loop_notifier.send() + _start_loop() + _loop_notifier.send() def close(self): with self.lock: @@ -150,14 +218,9 @@ class LibevConnection(Connection): self.is_closed = True log.debug("Closing connection (%s) to %s", id(self), self.host) - with _loop_lock: - if self._read_watcher: - self._read_watcher.stop() - if self._write_watcher: - self._write_watcher.stop() + self.__class__._connection_destroyed(self) + _loop_notifier.send() self._socket.close() - with _loop_lock: - _loop_notifier.send() # don't leave in-progress operations hanging if not self.is_defunct: @@ -205,11 +268,6 @@ class LibevConnection(Connection): with self._deque_lock: next_msg = self.deque.popleft() except IndexError: - with self._deque_lock: - if not self.deque: - with _loop_lock: - if self._write_watcher.is_active(): - self._write_watcher.stop() return try: @@ -218,7 +276,6 @@ class LibevConnection(Connection): if (err.args[0] in NONBLOCKING): with self._deque_lock: self.deque.appendleft(next_msg) - _loop_notifier.send() else: self.defunct(err) return @@ -226,14 +283,6 @@ class LibevConnection(Connection): if sent < len(next_msg): with self._deque_lock: self.deque.appendleft(next_msg[sent:]) - _loop_notifier.send() - elif not self.deque: - with self._deque_lock: - if not self.deque: - with _loop_lock: - if self._write_watcher.is_active(): - self._write_watcher.stop() - return def handle_read(self, watcher, revents): if revents & libev.EV_ERROR: @@ -305,10 +354,6 @@ class LibevConnection(Connection): with self._deque_lock: self.deque.extend(chunks) - - with _loop_lock: - if not self._write_watcher.is_active(): - self._write_watcher.start() _loop_notifier.send() def send_msg(self, msg, cb, wait_for_id=False): @@ -372,3 +417,9 @@ class LibevConnection(Connection): for event_type, callback in type_callback_dict.items(): self._push_watchers[event_type].add(callback) self.wait_for_response(RegisterMessage(event_list=type_callback_dict.keys())) + + +_preparer = libev.Prepare(_loop, LibevConnection.loop_will_run) +# prevent _preparer from keeping the loop from returning +_loop.unref() +_preparer.start() diff --git a/cassandra/io/libevwrapper.c b/cassandra/io/libevwrapper.c index 59805a56..6bb1db2d 100644 --- a/cassandra/io/libevwrapper.c +++ b/cassandra/io/libevwrapper.c @@ -339,6 +339,123 @@ static PyTypeObject libevwrapper_AsyncType = { (initproc)Async_init, /* tp_init */ }; +typedef struct libevwrapper_Prepare { + PyObject_HEAD + struct ev_prepare prepare; + struct libevwrapper_Loop *loop; + PyObject *callback; +} libevwrapper_Prepare; + +static void +Prepare_dealloc(libevwrapper_Prepare *self) { + Py_XDECREF(self->loop); + Py_XDECREF(self->callback); + self->ob_type->tp_free((PyObject *)self); +} + +static void prepare_callback(struct ev_loop *loop, ev_prepare *watcher, int revents) { + libevwrapper_Prepare *self = watcher->data; + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + PyObject *result = PyObject_CallFunction(self->callback, "O", self); + if (!result) { + PyErr_WriteUnraisable(self->callback); + } + Py_XDECREF(result); + + PyGILState_Release(gstate); +} + +static int +Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) { + PyObject *callback; + PyObject *loop; + + if (!PyArg_ParseTuple(args, "OO", &loop, &callback)) { + return -1; + } + + if (loop) { + Py_INCREF(loop); + self->loop = (libevwrapper_Loop *)loop; + } else { + return -1; + } + + if (callback) { + if (!PyCallable_Check(callback)) { + PyErr_SetString(PyExc_TypeError, "callback parameter must be callable"); + Py_XDECREF(loop); + return -1; + } + Py_INCREF(callback); + self->callback = callback; + } + ev_prepare_init(&self->prepare, prepare_callback); + self->prepare.data = self; + return 0; +} + +static PyObject * +Prepare_start(libevwrapper_Prepare *self, PyObject *args) { + ev_prepare_start(self->loop->loop, &self->prepare); + Py_RETURN_NONE; +} + +static PyObject * +Prepare_stop(libevwrapper_Prepare *self, PyObject *args) { + ev_prepare_stop(self->loop->loop, &self->prepare); + Py_RETURN_NONE; +} + +static PyMethodDef Prepare_methods[] = { + {"start", (PyCFunction)Prepare_start, METH_NOARGS, "Start the Prepare watcher"}, + {"stop", (PyCFunction)Prepare_stop, METH_NOARGS, "Stop the Prepare watcher"}, + {NULL} /* Sentinal */ +}; + +static PyTypeObject libevwrapper_PrepareType = { + PyObject_HEAD_INIT(NULL) + 0, /*ob_size*/ + "cassandra.io.libevwrapper.Prepare", /*tp_name*/ + sizeof(libevwrapper_Prepare), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)Prepare_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/ + "Prepare objects", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + Prepare_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)Prepare_init, /* tp_init */ +}; + static PyMethodDef module_methods[] = { {NULL} /* Sentinal */ }; @@ -359,6 +476,10 @@ initlibevwrapper(void) if (PyType_Ready(&libevwrapper_IOType) < 0) return; + libevwrapper_PrepareType.tp_new = PyType_GenericNew; + if (PyType_Ready(&libevwrapper_PrepareType) < 0) + return; + libevwrapper_AsyncType.tp_new = PyType_GenericNew; if (PyType_Ready(&libevwrapper_AsyncType) < 0) return; @@ -380,6 +501,10 @@ initlibevwrapper(void) if (PyModule_AddObject(m, "IO", (PyObject *)&libevwrapper_IOType) == -1) return; + Py_INCREF(&libevwrapper_PrepareType); + if (PyModule_AddObject(m, "Prepare", (PyObject *)&libevwrapper_PrepareType) == -1) + return; + Py_INCREF(&libevwrapper_AsyncType); if (PyModule_AddObject(m, "Async", (PyObject *)&libevwrapper_AsyncType) == -1) return; diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index da9098d6..7f9f2615 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -24,6 +24,8 @@ except ImportError as exc: @patch('socket.socket') @patch('cassandra.io.libevwrapper.IO') +@patch('cassandra.io.libevwrapper.Prepare') +@patch('cassandra.io.libevwrapper.Async') @patch('cassandra.io.libevreactor._start_loop') class LibevConnectionTest(unittest.TestCase):