diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index b776e0e6..7dc66a93 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -1,4 +1,6 @@ +import atexit from collections import deque +from functools import partial import logging import os import socket @@ -31,63 +33,60 @@ except ImportError: log = logging.getLogger(__name__) -_loop = libev.Loop() -_loop_notifier = libev.Async(_loop) -_loop_notifier.start() - -# prevent _loop_notifier from keeping the loop from returning -_loop.unref() - -_loop_started = None -_loop_lock = Lock() -_shutdown = False - - -def _run_loop(): - while True: - end_condition = _loop.start() - # there are still active watchers, no deadlock - with _loop_lock: - if not _shutdown and end_condition: - log.debug("Restarting event loop") - continue - else: - # all Connections have been closed, no active watchers - log.debug("All Connections currently closed, event loop ended") - global _loop_started - _loop_started = False - break - - -def _start_loop(): - global _loop_started - should_start = False - with _loop_lock: - if not _loop_started: - log.debug("Starting libev event loop") - _loop_started = True - should_start = True - - if should_start: - t = Thread(target=_run_loop, name="event_loop") - t.daemon = True - t.start() - - return should_start - - -def _cleanup(thread): - global _shutdown - _shutdown = True - log.debug("Waiting for event loop thread to join...") - thread.join() - log.debug("Event loop thread was joined") - class LibevConnection(Connection): """ An implementation of :class:`.Connection` that uses libev for its event loop. """ + _loop = libev.Loop() + _loop_notifier = libev.Async(_loop) + _loop_notifier.start() + + # prevent _loop_notifier from keeping the loop from returning + _loop.unref() + + _loop_started = None + _loop_lock = Lock() + _loop_shutdown = False + + @classmethod + def _run_loop(cls): + while True: + end_condition = cls._loop.start() + # there are still active watchers, no deadlock + with cls._loop_lock: + if not cls._loop_shutdown and (end_condition or cls._live_conns): + log.debug("Restarting event loop") + continue + else: + # all Connections have been closed, no active watchers + log.debug("All Connections currently closed, event loop ended") + cls._loop_started = False + break + + @classmethod + def _maybe_start_loop(cls): + should_start = False + with cls._loop_lock: + if not cls._loop_started: + log.debug("Starting libev event loop") + cls._loop_started = True + should_start = True + + if should_start: + t = Thread(target=cls._run_loop, name="event_loop") + t.daemon = True + t.start() + atexit.register(partial(cls._cleanup, t)) + + return should_start + + @classmethod + def _cleanup(cls, thread): + cls._loop_shutdown = True + log.debug("Waiting for event loop thread to join...") + thread.join() + log.debug("Event loop thread was joined") # class-level set of all connections; only replaced with a new copy # while holding _conn_set_lock, never modified in place @@ -178,7 +177,7 @@ class LibevConnection(Connection): changed = True if changed: - _loop_notifier.send() + cls._loop_notifier.send() def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) @@ -203,17 +202,17 @@ class LibevConnection(Connection): for args in self.sockopts: self._socket.setsockopt(*args) - with _loop_lock: - self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, _loop, self.handle_read) - self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, _loop, self.handle_write) + with self._loop_lock: + self._read_watcher = libev.IO(self._socket._sock, libev.EV_READ, self._loop, self.handle_read) + self._write_watcher = libev.IO(self._socket._sock, libev.EV_WRITE, self._loop, self.handle_write) self._send_options_message() self.__class__._connection_created(self) # start the global event loop if needed - _start_loop() - _loop_notifier.send() + self._maybe_start_loop() + self._loop_notifier.send() def close(self): with self.lock: @@ -223,7 +222,7 @@ class LibevConnection(Connection): log.debug("Closing connection (%s) to %s", id(self), self.host) self.__class__._connection_destroyed(self) - _loop_notifier.send() + self._loop_notifier.send() self._socket.close() # don't leave in-progress operations hanging @@ -333,7 +332,7 @@ class LibevConnection(Connection): with self._deque_lock: self.deque.extend(chunks) - _loop_notifier.send() + self._loop_notifier.send() def register_watcher(self, event_type, callback, register_timeout=None): self._push_watchers[event_type].add(callback) @@ -347,7 +346,7 @@ class LibevConnection(Connection): RegisterMessage(event_list=type_callback_dict.keys()), timeout=register_timeout) -_preparer = libev.Prepare(_loop, LibevConnection.loop_will_run) +_preparer = libev.Prepare(LibevConnection._loop, LibevConnection.loop_will_run) # prevent _preparer from keeping the loop from returning -_loop.unref() +LibevConnection._loop.unref() _preparer.start() diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index 408af29a..a8a73274 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -27,7 +27,7 @@ except ImportError: @patch('cassandra.io.libevwrapper.IO') @patch('cassandra.io.libevwrapper.Prepare') @patch('cassandra.io.libevwrapper.Async') -@patch('cassandra.io.libevreactor._start_loop') +@patch('cassandra.io.libevreactor.LibevConnection._maybe_start_loop') class LibevConnectionTest(unittest.TestCase): def setUp(self):