Fix zmq._QueueLock test in Python2.6 - TestCase.assertRaises is not context manager yet

This commit is contained in:
Sergey Shepelev
2012-12-11 17:22:34 +04:00
parent 0e1014d5d4
commit 2879b70781
2 changed files with 17 additions and 9 deletions

View File

@@ -13,6 +13,11 @@ slurp_properties(__zmq__, globals(), ignore=__patched__)
from collections import deque from collections import deque
class LockReleaseError(Exception):
pass
class _QueueLock(object): class _QueueLock(object):
"""A Lock that can be acquired by at most one thread. Any other """A Lock that can be acquired by at most one thread. Any other
thread calling acquire will be blocked in a queue. When release thread calling acquire will be blocked in a queue. When release
@@ -50,7 +55,7 @@ class _QueueLock(object):
def release(self): def release(self):
if self._count <= 0: if self._count <= 0:
raise Exception("Cannot release unacquired lock") raise LockReleaseError("Cannot release unacquired lock")
self._count -= 1 self._count -= 1
if self._count == 0: if self._count == 0:
@@ -59,6 +64,7 @@ class _QueueLock(object):
# wake next # wake next
self._hub.schedule_call_global(0, self._waiters[0].switch) self._hub.schedule_call_global(0, self._waiters[0].switch)
class _BlockedThread(object): class _BlockedThread(object):
"""Is either empty, or represents a single blocked thread that """Is either empty, or represents a single blocked thread that
blocked itself by calling the block() method. The thread can be blocked itself by calling the block() method. The thread can be

View File

@@ -12,6 +12,7 @@ try:
except ImportError: except ImportError:
zmq = {} # for systems lacking zmq, skips tests instead of barfing zmq = {} # for systems lacking zmq, skips tests instead of barfing
def zmq_supported(_): def zmq_supported(_):
try: try:
import zmq import zmq
@@ -19,22 +20,24 @@ def zmq_supported(_):
return False return False
return not using_pyevent(_) return not using_pyevent(_)
class TestUpstreamDownStream(LimitedTestCase):
class TestUpstreamDownStream(LimitedTestCase):
@skip_unless(zmq_supported)
def setUp(self): def setUp(self):
super(TestUpstreamDownStream, self).setUp() super(TestUpstreamDownStream, self).setUp()
self.context = zmq.Context()
self.sockets = [] self.sockets = []
@skip_unless(zmq_supported)
def tearDown(self): def tearDown(self):
self.clear_up_sockets() self.clear_up_sockets()
super(TestUpstreamDownStream, self).tearDown() super(TestUpstreamDownStream, self).tearDown()
def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'): def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'):
"""Create a bound socket pair using a random port.""" """Create a bound socket pair using a random port."""
self.context = context = zmq.Context() s1 = self.context.socket(type1)
s1 = context.socket(type1)
port = s1.bind_to_random_port(interface) port = s1.bind_to_random_port(interface)
s2 = context.socket(type2) s2 = self.context.socket(type2)
s2.connect('%s:%s' % (interface, port)) s2.connect('%s:%s' % (interface, port))
self.sockets.append(s1) self.sockets.append(s1)
self.sockets.append(s2) self.sockets.append(s2)
@@ -436,14 +439,12 @@ class TestQueueLock(LimitedTestCase):
def test_errors(self): def test_errors(self):
q = zmq._QueueLock() q = zmq._QueueLock()
with self.assertRaises(Exception): self.assertRaises(zmq.LockReleaseError, q.release)
q.release()
q.acquire() q.acquire()
q.release() q.release()
with self.assertRaises(Exception): self.assertRaises(zmq.LockReleaseError, q.release)
q.release()
@skip_unless(zmq_supported) @skip_unless(zmq_supported)
def test_nested_acquire(self): def test_nested_acquire(self):
@@ -471,6 +472,7 @@ class TestQueueLock(LimitedTestCase):
s.acquire() s.acquire()
self.assertEquals(results, [1]) self.assertEquals(results, [1])
class TestBlockedThread(LimitedTestCase): class TestBlockedThread(LimitedTestCase):
@skip_unless(zmq_supported) @skip_unless(zmq_supported)
def test_block(self): def test_block(self):