Minor refactor of loop state handling

This commit is contained in:
Tyler Hobbs
2014-06-18 14:16:36 -05:00
parent 6bc5f94443
commit 7e4f90c908

View File

@@ -16,7 +16,7 @@ Module that implements an event loop based on twisted
( https://twistedmatrix.com ).
"""
from twisted.internet import reactor, protocol
from threading import Event, Thread
from threading import Event, Thread, Lock
from functools import partial
import logging
import weakref
@@ -36,7 +36,7 @@ log = logging.getLogger(__name__)
def _cleanup(cleanup_weakref):
try:
cleanup_weakref()
cleanup_weakref()._cleanup()
except ReferenceError:
return
@@ -46,6 +46,7 @@ class TwistedConnectionProtocol(protocol.Protocol):
Twisted Protocol class for handling data received and connection
made events.
"""
def dataReceived(self, data):
"""
Callback function that is called when data has been received
@@ -68,6 +69,7 @@ class TwistedConnectionProtocol(protocol.Protocol):
class TwistedConnectionClientFactory(protocol.ClientFactory):
def __init__(self, connection):
# ClientFactory does not define __init__() in parent classes
# and does not inherit from object.
@@ -97,14 +99,49 @@ class TwistedConnectionClientFactory(protocol.ClientFactory):
self.conn.close()
class TwistedLoop(object):
_lock = None
_thread = None
def __init__(self):
self._lock = Lock()
def maybe_start(self):
with self._lock:
if not reactor.running:
self._thread = Thread(target=reactor.run,
name="cassandra_driver_event_loop",
kwargs={'installSignalHandlers': False})
self._thread.daemon = True
self._thread.start()
atexit.register(partial(_cleanup, weakref.ref(self)))
def _cleanup(self):
if self._thread:
reactor.callFromThread(reactor.stop)
self._thread.join(timeout=1.0)
if self._thread.is_alive():
log.warning("Event loop thread could not be joined, so "
"shutdown may not be clean. Please call "
"Cluster.shutdown() to avoid this.")
log.debug("Event loop thread was joined")
class TwistedConnection(Connection):
"""
An implementation of :class:`.Connection` that utilizes the
Twisted event loop.
"""
_loop = None
_total_reqd_bytes = 0
@classmethod
def initialize_reactor(cls):
if not cls._loop:
cls._loop = TwistedLoop()
@classmethod
def factory(cls, *args, **kwargs):
"""
@@ -136,20 +173,12 @@ class TwistedConnection(Connection):
self.connected_event = Event()
self._iobuf = BytesIO()
self._thread = None
self.is_closed = True
self.connector = None
self._callbacks = {}
reactor.callFromThread(self.add_connection)
if not reactor.running: # XXX: might want a lock here?
self._thread = Thread(target=reactor.run,
name="cassandra_driver_event_loop",
kwargs={'installSignalHandlers': False})
self._thread.daemon = True
self._thread.start()
atexit.register(partial(_cleanup, weakref.ref(self)))
self._loop.maybe_start()
def add_connection(self):
"""
@@ -236,16 +265,6 @@ class TwistedConnection(Connection):
"""
reactor.callFromThread(self.connector.transport.write, data)
def _cleanup(self):
if self._thread:
reactor.callFromThread(reactor.stop)
self._thread.join(timeout=1.0)
if self._thread.is_alive():
log.warning("Event loop thread could not be joined, so "
"shutdown may not be clean. Please call "
"Cluster.shutdown() to avoid this.")
log.debug("Event loop thread was joined")
def register_watcher(self, event_type, callback, register_timeout=None):
"""
Register a callback for a given event type.