Restored multiple readers.
This commit is contained in:
@@ -50,6 +50,7 @@ class BaseHub(object):
|
|||||||
|
|
||||||
def __init__(self, clock=time.time):
|
def __init__(self, clock=time.time):
|
||||||
self.listeners = {READ:{}, WRITE:{}}
|
self.listeners = {READ:{}, WRITE:{}}
|
||||||
|
self.secondaries = {READ:{}, WRITE:{}}
|
||||||
|
|
||||||
self.clock = clock
|
self.clock = clock
|
||||||
self.greenlet = greenlet.greenlet(self.run)
|
self.greenlet = greenlet.greenlet(self.run)
|
||||||
@@ -73,18 +74,32 @@ class BaseHub(object):
|
|||||||
listener = self.lclass(evtype, fileno, cb)
|
listener = self.lclass(evtype, fileno, cb)
|
||||||
bucket = self.listeners[evtype]
|
bucket = self.listeners[evtype]
|
||||||
if fileno in bucket:
|
if fileno in bucket:
|
||||||
raise RuntimeError("Multiple %s for fileno %s" % (evtype, fileno))
|
# store off the second listener in another structure
|
||||||
bucket[fileno] = listener
|
self.secondaries[evtype].setdefault(fileno, []).append(listener)
|
||||||
|
else:
|
||||||
|
bucket[fileno] = listener
|
||||||
return listener
|
return listener
|
||||||
|
|
||||||
def remove(self, listener):
|
def remove(self, listener):
|
||||||
self.listeners[listener.evtype].pop(listener.fileno, None)
|
fileno = listener.fileno
|
||||||
|
evtype = listener.evtype
|
||||||
|
self.listeners[evtype].pop(fileno, None)
|
||||||
|
# migrate a secondary listener to be the primary listener
|
||||||
|
if fileno in self.secondaries[evtype]:
|
||||||
|
sec = self.secondaries[evtype].get(fileno, ())
|
||||||
|
if not sec:
|
||||||
|
return
|
||||||
|
self.listeners[evtype][fileno] = sec.pop(0)
|
||||||
|
if not sec:
|
||||||
|
del self.secondaries[evtype][fileno]
|
||||||
|
|
||||||
def remove_descriptor(self, fileno):
|
def remove_descriptor(self, fileno):
|
||||||
""" Completely remove all listeners for this fileno. For internal use
|
""" Completely remove all listeners for this fileno. For internal use
|
||||||
only."""
|
only."""
|
||||||
self.listeners[READ].pop(fileno, None)
|
self.listeners[READ].pop(fileno, None)
|
||||||
self.listeners[WRITE].pop(fileno, None)
|
self.listeners[WRITE].pop(fileno, None)
|
||||||
|
self.secondaries[READ].pop(fileno, None)
|
||||||
|
self.secondaries[WRITE].pop(fileno, None)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.abort()
|
self.abort()
|
||||||
|
@@ -108,12 +108,7 @@ class Hub(BaseHub):
|
|||||||
elif evtype is WRITE:
|
elif evtype is WRITE:
|
||||||
evt = event.write(fileno, cb, fileno)
|
evt = event.write(fileno, cb, fileno)
|
||||||
|
|
||||||
listener = FdListener(evtype, fileno, evt)
|
return super(Hub,self).add(evtype, fileno, evt)
|
||||||
bucket = self.listeners[evtype]
|
|
||||||
if fileno in bucket:
|
|
||||||
raise RuntimeError("Multiple %s for fileno %s" % (evtype, fileno))
|
|
||||||
bucket[fileno] = listener
|
|
||||||
return listener
|
|
||||||
|
|
||||||
def signal(self, signalnum, handler):
|
def signal(self, signalnum, handler):
|
||||||
def wrapper():
|
def wrapper():
|
||||||
|
Reference in New Issue
Block a user