Merge pull request #567 from datastax/239
PYTHON-239 - reduce CPU usage at idle for asyncorereactor
This commit is contained in:
@@ -18,7 +18,7 @@ import logging
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
from threading import Event, Lock, Thread
|
||||
from threading import Lock, Thread
|
||||
import time
|
||||
import weakref
|
||||
|
||||
@@ -36,12 +36,11 @@ try:
|
||||
except ImportError:
|
||||
ssl = None # NOQA
|
||||
|
||||
from cassandra.connection import (Connection, ConnectionShutdown,
|
||||
ConnectionException, NONBLOCKING,
|
||||
Timer, TimerManager)
|
||||
from cassandra.connection import Connection, ConnectionShutdown, NONBLOCKING, Timer, TimerManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_dispatcher_map = {}
|
||||
|
||||
def _cleanup(loop_weakref):
|
||||
try:
|
||||
@@ -52,8 +51,128 @@ def _cleanup(loop_weakref):
|
||||
loop._cleanup()
|
||||
|
||||
|
||||
class _PipeWrapper(object):
|
||||
|
||||
def __init__(self, fd):
|
||||
self.fd = fd
|
||||
|
||||
def fileno(self):
|
||||
return self.fd
|
||||
|
||||
def close(self):
|
||||
os.close(self.fd)
|
||||
|
||||
|
||||
class _AsyncoreDispatcher(asyncore.dispatcher):
|
||||
|
||||
def __init__(self, socket):
|
||||
asyncore.dispatcher.__init__(self, map=_dispatcher_map)
|
||||
# inject after to avoid base class validation
|
||||
self.set_socket(socket)
|
||||
self._notified = False
|
||||
|
||||
def writable(self):
|
||||
return False
|
||||
|
||||
def validate(self):
|
||||
assert not self._notified
|
||||
self.notify_loop()
|
||||
assert self._notified
|
||||
self.loop(0.1)
|
||||
assert not self._notified
|
||||
|
||||
def loop(self, timeout):
|
||||
asyncore.loop(timeout=timeout, use_poll=True, map=_dispatcher_map, count=1)
|
||||
|
||||
|
||||
class _AsyncorePipeDispatcher(_AsyncoreDispatcher):
|
||||
|
||||
def __init__(self):
|
||||
self.read_fd, self.write_fd = os.pipe()
|
||||
_AsyncoreDispatcher.__init__(self, _PipeWrapper(self.read_fd))
|
||||
|
||||
def writable(self):
|
||||
return False
|
||||
|
||||
def handle_read(self):
|
||||
while len(os.read(self.read_fd, 4096)) == 4096:
|
||||
pass
|
||||
self._notified = False
|
||||
|
||||
def notify_loop(self):
|
||||
if not self._notified:
|
||||
self._notified = True
|
||||
os.write(self.write_fd, 'x')
|
||||
|
||||
|
||||
class _AsyncoreUDPDispatcher(_AsyncoreDispatcher):
|
||||
"""
|
||||
Experimental alternate dispatcher for avoiding busy wait in the asyncore loop. It is not used by default because
|
||||
it relies on local port binding.
|
||||
Port scanning is not implemented, so multiple clients on one host will collide. This address would need to be set per
|
||||
instance, or this could be specialized to scan until an address is found.
|
||||
|
||||
To use::
|
||||
|
||||
from cassandra.io.asyncorereactor import _AsyncoreUDPDispatcher, AsyncoreLoop
|
||||
AsyncoreLoop._loop_dispatch_class = _AsyncoreUDPDispatcher
|
||||
|
||||
"""
|
||||
bind_address = ('localhost', 10000)
|
||||
|
||||
def __init__(self):
|
||||
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self._socket.bind(self.bind_address)
|
||||
self._socket.setblocking(0)
|
||||
_AsyncoreDispatcher.__init__(self, self._socket)
|
||||
|
||||
def handle_read(self):
|
||||
try:
|
||||
d = self._socket.recvfrom(1)
|
||||
while d and d[1]:
|
||||
d = self._socket.recvfrom(1)
|
||||
except socket.error as e:
|
||||
pass
|
||||
self._notified = False
|
||||
|
||||
def notify_loop(self):
|
||||
if not self._notified:
|
||||
self._notified = True
|
||||
self._socket.sendto(b'', self.bind_address)
|
||||
|
||||
def loop(self, timeout):
|
||||
asyncore.loop(timeout=timeout, use_poll=False, map=_dispatcher_map, count=1)
|
||||
|
||||
|
||||
class _BusyWaitDispatcher(object):
|
||||
|
||||
max_write_latency = 0.001
|
||||
"""
|
||||
Timeout pushed down to asyncore select/poll. Dictates the amount of time it will sleep before coming back to check
|
||||
if anything is writable.
|
||||
"""
|
||||
|
||||
def notify_loop(self):
|
||||
pass
|
||||
|
||||
def loop(self, timeout):
|
||||
if not _dispatcher_map:
|
||||
time.sleep(0.005)
|
||||
count = timeout // self.max_write_latency
|
||||
asyncore.loop(timeout=self.max_write_latency, use_poll=True, map=_dispatcher_map, count=count)
|
||||
|
||||
def validate(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class AsyncoreLoop(object):
|
||||
|
||||
timer_resolution = 0.1 # used as the max interval to be in the io loop before returning to service timeouts
|
||||
|
||||
_loop_dispatch_class = _AsyncorePipeDispatcher if os.name != 'nt' else _BusyWaitDispatcher
|
||||
|
||||
def __init__(self):
|
||||
self._pid = os.getpid()
|
||||
@@ -65,6 +184,16 @@ class AsyncoreLoop(object):
|
||||
|
||||
self._timers = TimerManager()
|
||||
|
||||
try:
|
||||
dispatcher = self._loop_dispatch_class()
|
||||
dispatcher.validate()
|
||||
log.debug("Validated loop dispatch with %s", self._loop_dispatch_class)
|
||||
except Exception:
|
||||
log.exception("Failed validating loop dispatch with %s. Using busy wait execution instead.", self._loop_dispatch_class)
|
||||
dispatcher.close()
|
||||
dispatcher = _BusyWaitDispatcher()
|
||||
self._loop_dispatcher = dispatcher
|
||||
|
||||
atexit.register(partial(_cleanup, weakref.ref(self)))
|
||||
|
||||
def maybe_start(self):
|
||||
@@ -84,15 +213,16 @@ class AsyncoreLoop(object):
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def wake_loop(self):
|
||||
self._loop_dispatcher.notify_loop()
|
||||
|
||||
def _run_loop(self):
|
||||
log.debug("Starting asyncore event loop")
|
||||
with self._loop_lock:
|
||||
while not self._shutdown:
|
||||
try:
|
||||
asyncore.loop(timeout=0.001, use_poll=True, count=100)
|
||||
self._loop_dispatcher.loop(self.timer_resolution)
|
||||
self._timers.service_timeouts()
|
||||
if not asyncore.socket_map:
|
||||
time.sleep(0.005)
|
||||
except Exception:
|
||||
log.debug("Asyncore event loop stopped unexepectedly", exc_info=True)
|
||||
break
|
||||
@@ -154,13 +284,12 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Connection.__init__(self, *args, **kwargs)
|
||||
asyncore.dispatcher.__init__(self)
|
||||
|
||||
self.deque = deque()
|
||||
self.deque_lock = Lock()
|
||||
|
||||
self._connect_socket()
|
||||
asyncore.dispatcher.__init__(self, self._socket)
|
||||
asyncore.dispatcher.__init__(self, self._socket, _dispatcher_map)
|
||||
|
||||
self._writable = True
|
||||
self._readable = True
|
||||
@@ -254,6 +383,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
|
||||
with self.deque_lock:
|
||||
self.deque.extend(chunks)
|
||||
self._writable = True
|
||||
self._loop.wake_loop()
|
||||
|
||||
def writable(self):
|
||||
return self._writable
|
||||
|
||||
Reference in New Issue
Block a user