Avoid early termination of libev event loop

This commit is contained in:
Tyler Hobbs
2014-04-16 14:42:59 -05:00
parent ec598963fc
commit 59d09c3705
2 changed files with 62 additions and 63 deletions

View File

@@ -1,4 +1,6 @@
import atexit
from collections import deque
from functools import partial
import logging
import os
import socket
@@ -31,63 +33,60 @@ except ImportError:
log = logging.getLogger(__name__)
_loop = libev.Loop()
_loop_notifier = libev.Async(_loop)
_loop_notifier.start()
# prevent _loop_notifier from keeping the loop from returning
_loop.unref()
_loop_started = None
_loop_lock = Lock()
_shutdown = False
def _run_loop():
while True:
end_condition = _loop.start()
# there are still active watchers, no deadlock
with _loop_lock:
if not _shutdown and end_condition:
log.debug("Restarting event loop")
continue
else:
# all Connections have been closed, no active watchers
log.debug("All Connections currently closed, event loop ended")
global _loop_started
_loop_started = False
break
def _start_loop():
global _loop_started
should_start = False
with _loop_lock:
if not _loop_started:
log.debug("Starting libev event loop")
_loop_started = True
should_start = True
if should_start:
t = Thread(target=_run_loop, name="event_loop")
t.daemon = True
t.start()
return should_start
def _cleanup(thread):
global _shutdown
_shutdown = True
log.debug("Waiting for event loop thread to join...")
thread.join()
log.debug("Event loop thread was joined")
class LibevConnection(Connection):
"""
An implementation of :class:`.Connection` that uses libev for its event loop.
"""
_loop = libev.Loop()
_loop_notifier = libev.Async(_loop)
_loop_notifier.start()
# prevent _loop_notifier from keeping the loop from returning
_loop.unref()
_loop_started = None
_loop_lock = Lock()
_loop_shutdown = False
@classmethod
def _run_loop(cls):
while True:
end_condition = cls._loop.start()
# there are still active watchers, no deadlock
with cls._loop_lock:
if not cls._loop_shutdown and (end_condition or cls._live_conns):
log.debug("Restarting event loop")
continue
else:
# all Connections have been closed, no active watchers
log.debug("All Connections currently closed, event loop ended")
cls._loop_started = False
break
@classmethod
def _maybe_start_loop(cls):
should_start = False
with cls._loop_lock:
if not cls._loop_started:
log.debug("Starting libev event loop")
cls._loop_started = True
should_start = True
if should_start:
t = Thread(target=cls._run_loop, name="event_loop")
t.daemon = True
t.start()
atexit.register(partial(cls._cleanup, t))
return should_start
@classmethod
def _cleanup(cls, thread):
cls._loop_shutdown = True
log.debug("Waiting for event loop thread to join...")
thread.join()
log.debug("Event loop thread was joined")
# class-level set of all connections; only replaced with a new copy
# while holding _conn_set_lock, never modified in place
@@ -178,7 +177,7 @@ class LibevConnection(Connection):
changed = True
if changed:
_loop_notifier.send()
cls._loop_notifier.send()
def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
@@ -203,17 +202,17 @@ class LibevConnection(Connection):
for args in self.sockopts:
self._socket.setsockopt(*args)
with _loop_lock:
self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, _loop, self.handle_read)
self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, _loop, self.handle_write)
with self._loop_lock:
self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, self._loop, self.handle_read)
self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, self._loop, self.handle_write)
self._send_options_message()
self.__class__._connection_created(self)
# start the global event loop if needed
_start_loop()
_loop_notifier.send()
self._maybe_start_loop()
self._loop_notifier.send()
def close(self):
with self.lock:
@@ -223,7 +222,7 @@ class LibevConnection(Connection):
log.debug("Closing connection (%s) to %s", id(self), self.host)
self.__class__._connection_destroyed(self)
_loop_notifier.send()
self._loop_notifier.send()
self._socket.close()
# don't leave in-progress operations hanging
@@ -333,7 +332,7 @@ class LibevConnection(Connection):
with self._deque_lock:
self.deque.extend(chunks)
_loop_notifier.send()
self._loop_notifier.send()
def register_watcher(self, event_type, callback, register_timeout=None):
self._push_watchers[event_type].add(callback)
@@ -347,7 +346,7 @@ class LibevConnection(Connection):
RegisterMessage(event_list=type_callback_dict.keys()), timeout=register_timeout)
_preparer = libev.Prepare(_loop, LibevConnection.loop_will_run)
_preparer = libev.Prepare(LibevConnection._loop, LibevConnection.loop_will_run)
# prevent _preparer from keeping the loop from returning
_loop.unref()
LibevConnection._loop.unref()
_preparer.start()

View File

@@ -27,7 +27,7 @@ except ImportError:
@patch('cassandra.io.libevwrapper.IO')
@patch('cassandra.io.libevwrapper.Prepare')
@patch('cassandra.io.libevwrapper.Async')
@patch('cassandra.io.libevreactor._start_loop')
@patch('cassandra.io.libevreactor.LibevConnection._maybe_start_loop')
class LibevConnectionTest(unittest.TestCase):
def setUp(self):