Rename zmq._SimpleEvent. Add close during recv test. Updating comments.

This commit is contained in:
Geoff Salmon
2012-01-26 15:12:01 -05:00
parent c2b8e02927
commit 19ea50f694
2 changed files with 54 additions and 79 deletions

View File

@@ -58,12 +58,11 @@ class _QueueLock(object):
# wake next
self._hub.schedule_call_global(0, self._waiters[0].switch)
class _SimpleEvent(object):
"""Represents a possibly blocked thread which may be blocked
inside this class' block method or inside a trampoline call. In
either case, the threads can be awoken by calling wake(). Wake()
can be called multiple times and all but the first call will have
no effect."""
class _BlockedThread(object):
"""Is either empty, or represents a single blocked thread that
blocked itself by calling the block() method. The thread can be
awoken by calling wake(). Wake() can be called multiple times and
all but the first call will have no effect."""
def __init__(self):
self._blocked_thread = None
@@ -74,22 +73,20 @@ class _SimpleEvent(object):
return self._blocked_thread is not None
def block(self):
with self:
hubs.get_hub().switch()
def __enter__(self):
if self._blocked_thread is not None:
raise Exception("Cannot block more than one thread on one SimpleEvent")
raise Exception("Cannot block more than one thread on one BlockedThread")
self._blocked_thread = greenlet.getcurrent()
def __exit__(self, type, value, traceback):
self._blocked_thread = None
# cleanup the wakeup task
if self._wakeupper is not None:
# Important to cancel the wakeup task so it doesn't
# spuriously wake this greenthread later on.
self._wakeupper.cancel()
self._wakeupper = None
try:
self._hub.switch()
finally:
self._blocked_thread = None
# cleanup the wakeup task
if self._wakeupper is not None:
# Important to cancel the wakeup task so it doesn't
# spuriously wake this greenthread later on.
self._wakeupper.cancel()
self._wakeupper = None
def wake(self):
"""Schedules the blocked thread to be awoken and return
@@ -136,21 +133,11 @@ def _wraps(source_fn):
# the 0mq context is associated with, which is the native thread the
# greenthreads are running on, and the only operations that cause the
# events to be read and processed are send(), recv() and
# getsockopt(EVENTS). This means that after doing any of these three
# operations, the ability of the socket to send or receive a message
# without blocking may have changed. If you're not careful, this can
# cause the hub to miss the read event for the socket.
#
# For example, suppose thread A calls trampoline and blocks because it
# called recv() when there was no waiting message. It should be
# notified when the state of the socket changes. However, while thread
# A is blocked, thread B calls send(), which internally causes the
# events to be processed, and the socket learns that it has a message
# waiting to be received. Unfortunately, because eventlet is currently
# running greenthread B, it isn't currently blocked in hub.wait() in
# poll or the equivalent. When hub.wait() is eventually called, the
# socket's event pipe will no longer be readable, so thread A will not
# be awoken, even though a message is waiting to be read!
# getsockopt(zmq.EVENTS). This means that after doing any of these
# three operations, the ability of the socket to send or receive a
# message without blocking may have changed, but after the events are
# read the FD is no longer readable so the hub may not signal our
# listener.
#
# If we understand that after calling send() a message might be ready
# to be received and that after calling recv() a message might be able
@@ -168,21 +155,16 @@ def _wraps(source_fn):
# 2. Call getsockopt(zmq.EVENTS) and explicitly check if the other
# thread should be woken up. This avoids spurious wake-ups but may
# add overhead because getsockopt will cause all events to be
# processed, whereas send and recv can avoid processing
# processed, whereas send and recv throttle processing
# events. Admittedly, all of the events will need to be processed
# eventually, but it is likely faster to batch the processing.
#
# Which approach is better? I have no idea. Right now the NOBLOCK
# paths in _xsafe_send and _xsafe_recv check getsockopt(zmq.EVENTS)
# and the other paths always wake the other blocked thread. It's done
# this way only because it was convenient to implement, not based on
# any benchmarks.
# Which approach is better? I have no idea.
#
# TODO:
# - Ensure that recv* and send* methods raise error when called on a
# closed socket. They should not block.
# - Return correct message tracker from send* methods
# - Make MessageTracker.wait zmq friendly
# - Support MessageTrackers and make MessageTracker.wait green
# - What should happen to threads blocked on send/recv when socket is
# closed?
@@ -213,8 +195,8 @@ class Socket(_Socket):
def __init__(self, context, socket_type):
_Socket.__init__(self, context, socket_type)
self._send_event = _SimpleEvent()
self._recv_event = _SimpleEvent()
self._send_event = _BlockedThread()
self._recv_event = _BlockedThread()
# customize send and recv methods based on socket type
ops = self._eventlet_ops.get(socket_type)
@@ -268,6 +250,11 @@ class Socket(_Socket):
flags |= NOBLOCK
while True:
# Avoid recreating a Message object each time Socket.send
# is called. TODO: Not sure if this is benefitial or not.
#if not copy and not track and type(msg) is str:
# msg = Message(msg)
try:
return _Socket_send(self, msg, flags, copy, track)
except ZMQError, e:

View File

@@ -336,6 +336,26 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
final_i = done.wait()
self.assertEqual(final_i, 0)
@skip_unless(zmq_supported)
def test_close_during_recv(self):
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
sleep()
done1 = event.Event()
done2 = event.Event()
def rx(e):
self.assertRaisesErrno(zmq.ENOTSUP, receiver.recv)
e.send()
spawn(rx, done1)
spawn(rx, done2)
sleep()
receiver.close()
done1.wait()
done2.wait()
class TestQueueLock(LimitedTestCase):
@skip_unless(zmq_supported)
def test_queue_lock_order(self):
@@ -416,10 +436,10 @@ class TestQueueLock(LimitedTestCase):
s.acquire()
self.assertEquals(results, [1])
class TestSimpleEvent(LimitedTestCase):
class TestBlockedThread(LimitedTestCase):
@skip_unless(zmq_supported)
def test_block(self):
e = zmq._SimpleEvent()
e = zmq._BlockedThread()
done = event.Event()
self.assertFalse(e)
@@ -433,35 +453,3 @@ class TestSimpleEvent(LimitedTestCase):
self.assertFalse(done.has_result())
e.wake()
done.wait()
@skip_unless(zmq_supported)
def test_enter_exit(self):
e = zmq._SimpleEvent()
done = event.Event()
self.assertFalse(e)
def block():
with e:
get_hub().switch()
done.send(1)
gt = spawn(block)
sleep()
self.assertFalse(done.has_result())
get_hub().schedule_call_global(0, gt.switch)
done.wait()
@skip_unless(zmq_supported)
def test_error(self):
e1 = zmq._SimpleEvent()
with e1:
with self.assertRaises(Exception):
with e1:
pass
e2 = zmq._SimpleEvent()
with e2:
with self.assertRaises(Exception):
e2.block()