use interruptible trampoline for zmq REQ and REP sockets. Prefer positional args when have the choice.

This commit is contained in:
Geoff Salmon
2011-09-05 17:34:33 -04:00
parent 40d4e045d4
commit 3f2805d12b

View File

@@ -63,8 +63,6 @@ _disable_recv_types = set([__zmq__.PUB, __zmq__.PUSH])
# TODO:
# - Ensure that recv* and send* methods raise error when called on a
# closed socket. They should not block.
# - Ensure that recv* and send* methods raise EFSM error when socket
# is in improper state. Avoid blocking.
# - Return correct message tracker from send* methods
# - Make MessageTracker.wait zmq friendly
# - What should happen to threads blocked on send/recv when socket is
@@ -117,23 +115,44 @@ class Socket(__zmq__.Socket):
elif socket_type in _disable_recv_types:
self.recv = self.recv_multipart = self._recv_not_supported
def _sock_wait(self, read=False, write=False):
"""
First checks if there are events in the socket, to avoid
edge trigger problems with race conditions. Then if there
are none it will trampoline and when coming back check
for the events.
"""
events = self._super_getsockopt(__zmq__.EVENTS)
def _trampoline(self):
"""Wait for events on the zmq socket. After this method
returns it is still possible that send and recv will return
EAGAIN.
if read and (events & __zmq__.POLLIN):
return events
elif write and (events & __zmq__.POLLOUT):
return events
else:
# ONLY trampoline on read events for the zmq FD
Because the zmq FD is edge triggered, any call that causes the
zmq socket to process its events must wake the greenthread
that called trampoline by calling _wake_listener in case it
missed the event."""
try:
self._blocked_thread = greenlet.getcurrent()
# Only trampoline on read events for zmq FDs, never write.
trampoline(self._fd, read=True)
return self._super_getsockopt(__zmq__.EVENTS)
finally:
self._blocked_thread = None
# Either the fd is readable or we were woken by
# another thread. Cleanup the wakeup timer.
t = self._wakeup_timer
if t is not None:
# Important to cancel the timer so it doesn't
# spuriously wake this greenthread later on.
t.cancel()
self._wakeup_timer = None
def _wake_listener(self):
"""If a thread has called trampoline, wake it up. This can
safely be called multiple times and will have no effect if the
thread has already been woken up.
Returns True if there is a listener thread that called
trampoline, False if not."""
is_listener = self._blocked_thread is not None
if is_listener and self._wakeup_timer is None:
self._wakeup_timer = hubs.get_hub().schedule_call_global(0, self._blocked_thread.switch)
return True
return is_listener
def send(self, msg, flags=0, copy=True, track=False):
"""
@@ -142,17 +161,17 @@ class Socket(__zmq__.Socket):
called in real code.
"""
if flags & __zmq__.NOBLOCK:
super(Socket, self).send(msg, flags=flags, copy=copy, track=track)
return
return super(Socket, self).send(msg, flags, copy, track)
flags |= __zmq__.NOBLOCK
while True:
try:
self._sock_wait(write=True)
return super(Socket, self).send(msg, flags=flags, copy=copy, track=track)
return super(Socket, self).send(msg, flags, copy, track)
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
if e.errno == EAGAIN:
self._trampoline()
else:
raise
def recv(self, flags=0, copy=True, track=False):
@@ -162,16 +181,17 @@ class Socket(__zmq__.Socket):
called in real code.
"""
if flags & __zmq__.NOBLOCK:
return super(Socket, self).recv(flags=flags, copy=copy, track=track)
return super(Socket, self).recv(flags, copy, track)
flags |= __zmq__.NOBLOCK
while True:
try:
self._sock_wait(read=True)
return super(Socket, self).recv(flags=flags, copy=copy, track=track)
return super(Socket, self).recv(flags, copy, track)
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
if e.errno == EAGAIN:
self._trampoline()
else:
raise
def getsockopt(self, option):
@@ -189,10 +209,10 @@ class Socket(__zmq__.Socket):
return result
def _send_not_supported(self, msg, flags=0, copy=True, track=False):
def _send_not_supported(self, msg, flags, copy, track):
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
def _recv_not_supported(self, flags=0, copy=True, track=False):
def _recv_not_supported(self, flags, copy, track):
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
def _xsafe_send(self, msg, flags=0, copy=True, track=False):
@@ -203,7 +223,7 @@ class Socket(__zmq__.Socket):
"""
if flags & __zmq__.NOBLOCK:
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
result = super(Socket, self).send(msg, flags=flags, copy=copy, track=track)
result = super(Socket, self).send(msg, flags, copy, track)
self._wake_listener()
return result
@@ -220,7 +240,7 @@ class Socket(__zmq__.Socket):
if flags & __zmq__.NOBLOCK:
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
result = super(Socket, self).send_multipart(msg_parts, flags=flags, copy=copy, track=track)
result = super(Socket, self).send_multipart(msg_parts, flags, copy, track)
self._wake_listener()
return result
@@ -233,11 +253,9 @@ class Socket(__zmq__.Socket):
# immediately. This is the fast path.
try:
if multi:
r = super(Socket, self).send_multipart(
msg, flags=flags, copy=copy, track=track)
r = super(Socket, self).send_multipart(msg, flags, copy, track)
else:
r = super(Socket, self).send(
msg, flags=flags, copy=copy, track=track)
r = super(Socket, self).send( msg, flags, copy, track)
self._wake_listener()
return r
@@ -262,7 +280,7 @@ class Socket(__zmq__.Socket):
if flags & __zmq__.NOBLOCK:
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
msg = super(Socket, self).recv(flags=flags, copy=copy, track=track)
msg = super(Socket, self).recv(flags, copy, track)
self._wake_listener()
return msg
@@ -276,7 +294,7 @@ class Socket(__zmq__.Socket):
"""
if flags & __zmq__.NOBLOCK:
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
msg = super(Socket, self).recv_multipart(flags=flags, copy=copy, track=track)
msg = super(Socket, self).recv_multipart(flags, copy, track)
self._wake_listener()
return msg
@@ -289,11 +307,9 @@ class Socket(__zmq__.Socket):
# immediately. This is the fast path.
try:
if multi:
msg = super(Socket, self).recv_multipart(
flags=flags, copy=copy, track=track)
msg = super(Socket, self).recv_multipart(flags, copy, track)
else:
msg = super(Socket, self).recv(
flags=flags, copy=copy, track=track)
msg = super(Socket, self).recv(flags, copy, track)
self._wake_listener()
return msg
@@ -317,15 +333,6 @@ class Socket(__zmq__.Socket):
return self._process_queues()
def _wake_listener(self):
is_listener = self._blocked_thread is not None
if is_listener and self._wakeup_timer is None:
self._wakeup_timer = hubs.get_hub().schedule_call_global(0, self._blocked_thread.switch)
return True
return is_listener
def _process_queues(self):
""" If there are readers or writers queued, this method tries
to recv or send messages and ensures processing continues
@@ -383,21 +390,8 @@ class Socket(__zmq__.Socket):
if next_thread:
# Only trampoline if this thread is the next reader or writer
if next_reader is current or next_writer is current:
try:
self._blocked_thread = current
# Only trampoline on read events for zmq FDs, never write.
trampoline(self._fd, read=True)
continue
finally:
self._blocked_thread = None
# Either the fd is readable or we were woken by
# another thread. Cleanup the wakeup timer.
t = self._wakeup_timer
if t is not None:
# Important to cancel the timer so it doesn't
# spuriously wake this greenthread later on.
t.cancel()
self._wakeup_timer = None
self._trampoline()
continue
else:
# This greenthread's work is done. Wake another to
# continue processing the queues if there is one
@@ -423,9 +417,9 @@ class Socket(__zmq__.Socket):
writer, multi, msg, flags, copy, track = writers[0]
try:
if multi:
r = super_send_multipart(msg, flags=flags, copy=copy, track=track)
r = super_send_multipart(msg, flags, copy, track)
else:
r = super_send(msg, flags=flags, copy=copy, track=track)
r = super_send(msg, flags, copy, track)
# remember this thread's result
if current is writer: