add tests for zmq._SimpleEvent and zmq._QueueLock

This commit is contained in:
Geoff Salmon
2011-09-14 22:31:59 -04:00
parent e71bfc5f80
commit 8d69bc5d92
2 changed files with 134 additions and 3 deletions

View File

@@ -47,7 +47,8 @@ class _QueueLock(object):
self._count += 1
def release(self):
assert self._count > 0, 'Cannot release unacquired lock'
if self._count <= 0:
raise Exception("Cannot release unacquired lock")
self._count -= 1
if self._count == 0:
@@ -75,7 +76,8 @@ class _SimpleEvent(object):
hubs.get_hub().switch()
def __enter__(self):
assert self._blocked_thread is None, 'Cannot block more than one thread on one SimpleEvent'
if self._blocked_thread is not None:
raise Exception("Cannot block more than one thread on one SimpleEvent")
self._blocked_thread = greenlet.getcurrent()
def __exit__(self, type, value, traceback):

View File

@@ -1,4 +1,4 @@
from eventlet import event, spawn, sleep, patcher
from eventlet import event, spawn, sleep, patcher, semaphore
from eventlet.hubs import get_hub, _threadlocal, use_hub
from nose.tools import *
from tests import mock, LimitedTestCase, using_pyevent, skip_unless
@@ -342,3 +342,132 @@ class TestThreadedContextAccess(TestCase):
sleep(0.1)
self.assertFalse(test_result[0])
class TestQueueLock(LimitedTestCase):
@skip_unless(zmq_supported)
def test_queue_lock_order(self):
q = zmq._QueueLock()
s = semaphore.Semaphore(0)
results = []
def lock(x):
with q:
results.append(x)
s.release()
q.acquire()
spawn(lock, 1)
sleep()
spawn(lock, 2)
sleep()
spawn(lock, 3)
sleep()
self.assertEquals(results, [])
q.release()
s.acquire()
s.acquire()
s.acquire()
self.assertEquals(results, [1,2,3])
@skip_unless(zmq_supported)
def test_count(self):
q = zmq._QueueLock()
self.assertFalse(q)
q.acquire()
self.assertTrue(q)
q.release()
self.assertFalse(q)
with q:
self.assertTrue(q)
self.assertFalse(q)
@skip_unless(zmq_supported)
def test_errors(self):
q = zmq._QueueLock()
with self.assertRaises(Exception):
q.release()
q.acquire()
q.release()
with self.assertRaises(Exception):
q.release()
@skip_unless(zmq_supported)
def test_nested_acquire(self):
q = zmq._QueueLock()
self.assertFalse(q)
q.acquire()
q.acquire()
s = semaphore.Semaphore(0)
results = []
def lock(x):
with q:
results.append(x)
s.release()
spawn(lock, 1)
sleep()
self.assertEquals(results, [])
q.release()
sleep()
self.assertEquals(results, [])
self.assertTrue(q)
q.release()
s.acquire()
self.assertEquals(results, [1])
class TestSimpleEvent(LimitedTestCase):
@skip_unless(zmq_supported)
def test_block(self):
e = zmq._SimpleEvent()
done = event.Event()
self.assertFalse(e)
def block():
e.block()
done.send(1)
spawn(block)
sleep()
self.assertFalse(done.has_result())
e.wake()
done.wait()
@skip_unless(zmq_supported)
def test_enter_exit(self):
e = zmq._SimpleEvent()
done = event.Event()
self.assertFalse(e)
def block():
with e:
get_hub().switch()
done.send(1)
gt = spawn(block)
sleep()
self.assertFalse(done.has_result())
get_hub().schedule_call_global(0, gt.switch)
done.wait()
@skip_unless(zmq_supported)
def test_error(self):
e1 = zmq._SimpleEvent()
with e1:
with self.assertRaises(Exception):
with e1:
pass
e2 = zmq._SimpleEvent()
with e2:
with self.assertRaises(Exception):
e2.block()