Avoid calling trampoline for zmq sockets. Instead leave a listener registered in the hub until the socket is closed.

This commit is contained in:
Geoff Salmon
2012-01-26 13:57:57 -05:00
parent 7db155ab74
commit c2b8e02927

View File

@@ -22,6 +22,7 @@ class _QueueLock(object):
self._waiters = deque()
self._count = 0
self._holder = None
self._hub = hubs.get_hub()
def __nonzero__(self):
return self._count
@@ -37,7 +38,7 @@ class _QueueLock(object):
if (self._waiters or self._count > 0) and self._holder is not current:
# block until lock is free
self._waiters.append(current)
hubs.get_hub().switch()
self._hub.switch()
w = self._waiters.popleft()
assert w is current, 'Waiting threads woken out of order'
@@ -55,7 +56,7 @@ class _QueueLock(object):
self._holder = None
if self._waiters:
# wake next
hubs.get_hub().schedule_call_global(0, self._waiters[0].switch)
self._hub.schedule_call_global(0, self._waiters[0].switch)
class _SimpleEvent(object):
"""Represents a possibly blocked thread which may be blocked
@@ -67,6 +68,7 @@ class _SimpleEvent(object):
def __init__(self):
self._blocked_thread = None
self._wakeupper = None
self._hub = hubs.get_hub()
def __nonzero__(self):
return self._blocked_thread is not None
@@ -95,7 +97,7 @@ class _SimpleEvent(object):
blocked thread, then this call has no effect and returns
False."""
if self._blocked_thread is not None and self._wakeupper is None:
self._wakeupper = hubs.get_hub().schedule_call_global(0, self._blocked_thread.switch)
self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
return True
return False
@@ -207,10 +209,10 @@ class Socket(_Socket):
* recv_multipart
"""
_event_listener = None
def __init__(self, context, socket_type):
super(Socket, self).__init__(context, socket_type)
_Socket.__init__(self, context, socket_type)
self._in_trampoline = False
self._send_event = _SimpleEvent()
self._recv_event = _SimpleEvent()
@@ -234,28 +236,25 @@ class Socket(_Socket):
else:
self.recv = self.recv_multipart = self._send_not_supported
def _trampoline(self, evt):
"""Wait for events on the zmq socket. After this method
returns it is still possible that send and recv will return
EAGAIN.
def event(fd):
# Some events arrived at the zmq socket. This may mean
# there's a message that can be read or there's space for
# a message to be written.
self._send_event.wake()
self._recv_event.wake()
This supports being called by two separate greenthreads, a
sender and a receiver, but only the first caller will actually
call eventlet's trampoline method. The second thread will
still block.
"""
hub = hubs.get_hub()
self._event_listener = hub.add(hub.READ, self.getsockopt(FD), event)
if self._in_trampoline:
# Already a thread blocked in trampoline.
evt.block()
else:
try:
self._in_trampoline = True
with evt:
# Only trampoline on read events for zmq FDs, never write.
trampoline(self.getsockopt(FD), read=True)
finally:
self._in_trampoline = False
def close(self):
if self._event_listener is not None:
hubs.get_hub().remove(self._event_listener)
self._event_listener = None
# wake any blocked threads
self._send_event.wake()
self._recv_event.wake()
_Socket.close(self)
@_wraps(_Socket.send)
def send(self, msg, flags=0, copy=True, track=False):
@@ -273,7 +272,7 @@ class Socket(_Socket):
return _Socket_send(self, msg, flags, copy, track)
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(self._send_event)
self._send_event.block()
else:
raise
@@ -293,7 +292,7 @@ class Socket(_Socket):
return _Socket_recv(self, flags, copy, track)
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(self._recv_event)
self._recv_event.block()
else:
raise
@@ -306,10 +305,9 @@ class Socket(_Socket):
# there is a greenthread blocked and waiting for events,
# it will miss the edge-triggered read event, so wake it
# up.
if self._send_evt and (result & POLLOUT):
if (result & POLLOUT):
self._send_evt.wake()
if self._recv_evt and (result & POLLIN):
if (result & POLLIN):
self._recv_evt.wake()
return result
@@ -327,8 +325,11 @@ class Socket(_Socket):
"""
if flags & NOBLOCK:
result = _Socket_send(self, msg, flags, copy, track)
if self._send_event or self._recv_event:
self.getsockopt(EVENTS) # triggers wakeups
# Instead of calling both wake methods, could call
# self.getsockopt(EVENTS) which would trigger wakeups if
# needed.
self._send_event.wake()
self._recv_event.wake()
return result
# TODO: pyzmq will copy the message buffer and create Message
@@ -341,15 +342,14 @@ class Socket(_Socket):
return _Socket_send(self, msg, flags, copy, track)
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(self._send_event)
self._send_event.block()
else:
raise
finally:
# The call to send processes 0mq events and may
# make the socket ready to recv. Wake the next
# receiver. (Could check EVENTS for POLLIN here)
if self._recv_event:
self._recv_event.wake()
self._recv_event.wake()
@_wraps(_Socket.send_multipart)
@@ -374,27 +374,28 @@ class Socket(_Socket):
"""
if flags & NOBLOCK:
msg = _Socket_recv(self, flags, copy, track)
if self._send_event or self._recv_event:
self.getsockopt(EVENTS) # triggers wakeups
# Instead of calling both wake methods, could call
# self.getsockopt(EVENTS) which would trigger wakeups if
# needed.
self._send_event.wake()
self._recv_event.wake()
return msg
flags |= NOBLOCK
with self._recv_lock:
while True:
try:
try:
return _Socket_recv(self, flags, copy, track)
finally:
# The call to recv processes 0mq events and may
# make the socket ready to send. Wake the next
# receiver. (Could check EVENTS for POLLOUT here)
if self._send_event:
self._send_event.wake()
return _Socket_recv(self, flags, copy, track)
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(self._recv_event)
self._recv_event.block()
else:
raise
finally:
# The call to recv processes 0mq events and may
# make the socket ready to send. Wake the next
# receiver. (Could check EVENTS for POLLOUT here)
self._send_event.wake()
@_wraps(_Socket.recv_multipart)
def _xsafe_recv_multipart(self, flags=0, copy=True, track=False):