Threadsafe method for updating watcher status

This commit is contained in:
Tyler Hobbs
2013-12-26 17:17:52 -06:00
parent b090b555f5
commit 01c19ab3ba
3 changed files with 211 additions and 33 deletions

View File

@@ -87,6 +87,17 @@ class LibevConnection(Connection):
An implementation of :class:`.Connection` that utilizes libev.
"""
# class-level set of all connections; only replaced with a new copy
# while holding _conn_set_lock, never modified in place
_live_conns = set()
# newly created connections that need their write/read watcher started
_new_conns = set()
# recently closed connections that need their write/read watcher stopped
_closed_conns = set()
_conn_set_lock = Lock()
_write_watcher_is_active = False
_total_reqd_bytes = 0
_read_watcher = None
_write_watcher = None
@@ -105,6 +116,65 @@ class LibevConnection(Connection):
else:
return conn
@classmethod
def _connection_created(cls, conn):
with cls._conn_set_lock:
new_live_conns = cls._live_conns.copy()
new_live_conns.add(conn)
cls._live_conns = new_live_conns
new_new_conns = cls._new_conns.copy()
new_new_conns.add(conn)
cls._new_conns = new_new_conns
@classmethod
def _connection_destroyed(cls, conn):
with cls._conn_set_lock:
new_live_conns = cls._live_conns.copy()
new_live_conns.discard(conn)
cls._live_conns = new_live_conns
new_closed_conns = cls._closed_conns.copy()
new_closed_conns.add(conn)
cls._closed_conns = new_closed_conns
@classmethod
def loop_will_run(cls, prepare):
changed = False
for conn in cls._live_conns:
if not conn.deque and conn._write_watcher_is_active:
conn._write_watcher.stop()
conn._write_watcher_is_active = False
changed = True
elif conn.deque and not conn._write_watcher_is_active:
conn._write_watcher.start()
conn._write_watcher_is_active = True
changed = True
if cls._new_conns:
with cls._conn_set_lock:
to_start = cls._new_conns
cls._new_conns = set()
for conn in to_start:
conn._read_watcher.start()
changed = True
if cls._closed_conns:
with cls._conn_set_lock:
to_stop = cls._closed_conns
cls._closed_conns = set()
for conn in to_stop:
conn._write_watcher.stop()
conn._read_watcher.stop()
changed = True
if changed:
_loop_notifier.send()
def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
@@ -129,19 +199,17 @@ class LibevConnection(Connection):
for args in self.sockopts:
self._socket.setsockopt(*args)
self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, _loop, self.handle_read)
self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, _loop, self.handle_write)
with _loop_lock:
self._read_watcher.start()
self._write_watcher.start()
self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, _loop, self.handle_read)
self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, _loop, self.handle_write)
self._send_options_message()
self.__class__._connection_created(self)
# start the global event loop if needed
if not _start_loop():
# if the loop was already started, notify it
with _loop_lock:
_loop_notifier.send()
_start_loop()
_loop_notifier.send()
def close(self):
with self.lock:
@@ -150,14 +218,9 @@ class LibevConnection(Connection):
self.is_closed = True
log.debug("Closing connection (%s) to %s", id(self), self.host)
with _loop_lock:
if self._read_watcher:
self._read_watcher.stop()
if self._write_watcher:
self._write_watcher.stop()
self.__class__._connection_destroyed(self)
_loop_notifier.send()
self._socket.close()
with _loop_lock:
_loop_notifier.send()
# don't leave in-progress operations hanging
if not self.is_defunct:
@@ -205,11 +268,6 @@ class LibevConnection(Connection):
with self._deque_lock:
next_msg = self.deque.popleft()
except IndexError:
with self._deque_lock:
if not self.deque:
with _loop_lock:
if self._write_watcher.is_active():
self._write_watcher.stop()
return
try:
@@ -218,7 +276,6 @@ class LibevConnection(Connection):
if (err.args[0] in NONBLOCKING):
with self._deque_lock:
self.deque.appendleft(next_msg)
_loop_notifier.send()
else:
self.defunct(err)
return
@@ -226,14 +283,6 @@ class LibevConnection(Connection):
if sent < len(next_msg):
with self._deque_lock:
self.deque.appendleft(next_msg[sent:])
_loop_notifier.send()
elif not self.deque:
with self._deque_lock:
if not self.deque:
with _loop_lock:
if self._write_watcher.is_active():
self._write_watcher.stop()
return
def handle_read(self, watcher, revents):
if revents & libev.EV_ERROR:
@@ -305,10 +354,6 @@ class LibevConnection(Connection):
with self._deque_lock:
self.deque.extend(chunks)
with _loop_lock:
if not self._write_watcher.is_active():
self._write_watcher.start()
_loop_notifier.send()
def send_msg(self, msg, cb, wait_for_id=False):
@@ -372,3 +417,9 @@ class LibevConnection(Connection):
for event_type, callback in type_callback_dict.items():
self._push_watchers[event_type].add(callback)
self.wait_for_response(RegisterMessage(event_list=type_callback_dict.keys()))
_preparer = libev.Prepare(_loop, LibevConnection.loop_will_run)
# prevent _preparer from keeping the loop from returning
_loop.unref()
_preparer.start()

View File

@@ -339,6 +339,123 @@ static PyTypeObject libevwrapper_AsyncType = {
(initproc)Async_init, /* tp_init */
};
typedef struct libevwrapper_Prepare {
PyObject_HEAD
struct ev_prepare prepare;
struct libevwrapper_Loop *loop;
PyObject *callback;
} libevwrapper_Prepare;
static void
Prepare_dealloc(libevwrapper_Prepare *self) {
Py_XDECREF(self->loop);
Py_XDECREF(self->callback);
self->ob_type->tp_free((PyObject *)self);
}
static void prepare_callback(struct ev_loop *loop, ev_prepare *watcher, int revents) {
libevwrapper_Prepare *self = watcher->data;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
PyObject *result = PyObject_CallFunction(self->callback, "O", self);
if (!result) {
PyErr_WriteUnraisable(self->callback);
}
Py_XDECREF(result);
PyGILState_Release(gstate);
}
static int
Prepare_init(libevwrapper_Prepare *self, PyObject *args, PyObject *kwds) {
PyObject *callback;
PyObject *loop;
if (!PyArg_ParseTuple(args, "OO", &loop, &callback)) {
return -1;
}
if (loop) {
Py_INCREF(loop);
self->loop = (libevwrapper_Loop *)loop;
} else {
return -1;
}
if (callback) {
if (!PyCallable_Check(callback)) {
PyErr_SetString(PyExc_TypeError, "callback parameter must be callable");
Py_XDECREF(loop);
return -1;
}
Py_INCREF(callback);
self->callback = callback;
}
ev_prepare_init(&self->prepare, prepare_callback);
self->prepare.data = self;
return 0;
}
static PyObject *
Prepare_start(libevwrapper_Prepare *self, PyObject *args) {
ev_prepare_start(self->loop->loop, &self->prepare);
Py_RETURN_NONE;
}
static PyObject *
Prepare_stop(libevwrapper_Prepare *self, PyObject *args) {
ev_prepare_stop(self->loop->loop, &self->prepare);
Py_RETURN_NONE;
}
static PyMethodDef Prepare_methods[] = {
{"start", (PyCFunction)Prepare_start, METH_NOARGS, "Start the Prepare watcher"},
{"stop", (PyCFunction)Prepare_stop, METH_NOARGS, "Stop the Prepare watcher"},
{NULL} /* Sentinal */
};
static PyTypeObject libevwrapper_PrepareType = {
PyObject_HEAD_INIT(NULL)
0, /*ob_size*/
"cassandra.io.libevwrapper.Prepare", /*tp_name*/
sizeof(libevwrapper_Prepare), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Prepare_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/
"Prepare objects", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
Prepare_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)Prepare_init, /* tp_init */
};
static PyMethodDef module_methods[] = {
{NULL} /* Sentinal */
};
@@ -359,6 +476,10 @@ initlibevwrapper(void)
if (PyType_Ready(&libevwrapper_IOType) < 0)
return;
libevwrapper_PrepareType.tp_new = PyType_GenericNew;
if (PyType_Ready(&libevwrapper_PrepareType) < 0)
return;
libevwrapper_AsyncType.tp_new = PyType_GenericNew;
if (PyType_Ready(&libevwrapper_AsyncType) < 0)
return;
@@ -380,6 +501,10 @@ initlibevwrapper(void)
if (PyModule_AddObject(m, "IO", (PyObject *)&libevwrapper_IOType) == -1)
return;
Py_INCREF(&libevwrapper_PrepareType);
if (PyModule_AddObject(m, "Prepare", (PyObject *)&libevwrapper_PrepareType) == -1)
return;
Py_INCREF(&libevwrapper_AsyncType);
if (PyModule_AddObject(m, "Async", (PyObject *)&libevwrapper_AsyncType) == -1)
return;

View File

@@ -24,6 +24,8 @@ except ImportError as exc:
@patch('socket.socket')
@patch('cassandra.io.libevwrapper.IO')
@patch('cassandra.io.libevwrapper.Prepare')
@patch('cassandra.io.libevwrapper.Async')
@patch('cassandra.io.libevreactor._start_loop')
class LibevConnectionTest(unittest.TestCase):