[svn r101] Actor is extended to be able to process multiple messages concurrently. The default is 1, naturally. The tests had to be changed substantially because the additional handoff associated with using the coroutine pool made single api.sleep(0) insufficient to force an execution of the received method. Also switched to a deque in pools.py since that maps more closely to the problem domain. Reviewed by jonathan and donovan.

This commit is contained in:
which.linden
2008-03-10 22:26:32 -04:00
parent aca5baa34e
commit 5cbe01ad50
3 changed files with 89 additions and 77 deletions

View File

@@ -279,6 +279,8 @@ class CoroutinePool(pools.Pool):
print "GreenletExit raised in coroutine pool", e
if evt is not None:
evt.send(e) # sent as a return value, not an exception
except KeyboardInterrupt:
raise # allow program to exit
except Exception, e:
traceback.print_exc()
if evt is not None:
@@ -372,11 +374,17 @@ class Actor(object):
coroutine exists; if you lose all references to the actor object
it will never be freed.
"""
def __init__(self):
""" Constructs an Actor, kicking off a new coroutine to process the messages. """
def __init__(self, concurrency = 1):
""" Constructs an Actor, kicking off a new coroutine to process the messages.
The concurrency argument specifies how many messages the actor will try
to process concurrently. If it is 1, the actor will process messages
serially.
"""
self._mailbox = collections.deque()
self._event = event()
self._killer = api.spawn(self.run_forever)
self._pool = CoroutinePool(min_size=0, max_size=concurrency)
def run_forever(self):
""" Loops forever, continually checking the mailbox. """
@@ -388,17 +396,8 @@ class Actor(object):
# leave the message in the mailbox until after it's
# been processed so the event doesn't get triggered
# while in the received method
try:
self.received(self._mailbox[0])
except KeyboardInterrupt:
raise # allow the program to quit
except:
# we don't want to let the exception escape this
# loop because that would kill the coroutine
e = sys.exc_info()[0]
self.excepted(e)
sys.exc_clear()
self._pool.execute_async(
self.received, self._mailbox[0])
self._mailbox.popleft()
def cast(self, message):
@@ -423,16 +422,24 @@ class Actor(object):
replace it with something useful!
>>> class Greeter(Actor):
... def received(self, message):
... def received(self, (message, evt) ):
... print "received", message
... if evt: evt.send()
...
>>> a = Greeter()
>>> a.cast("message 1")
>>> api.sleep(0) # need to explicitly yield to cause the actor to run
This example uses events to synchronize between the actor and the main
coroutine in a predictable manner, but this kinda defeats the point of
the Actor, so don't do it in a real application.
>>> evt = event()
>>> a.cast( ("message 1", evt) )
>>> evt.wait() # force it to run at this exact moment
received message 1
>>> a.cast("message 2")
>>> a.cast("message 3")
>>> api.sleep(0)
>>> evt.reset()
>>> a.cast( ("message 2", None) )
>>> a.cast( ("message 3", evt) )
>>> evt.wait()
received message 2
received message 3
@@ -440,40 +447,6 @@ class Actor(object):
"""
raise NotImplementedError()
def excepted(self, exc):
""" Called when the received method raises an exception.
The default implementation simply prints out the raised exception.
Redefine it for customization.
>>> class Exceptor(Actor):
... def received(self, message):
... if message == 'fail':
... message + 1
... else:
... print "received", message
... def excepted(self, exc):
... # printing out exc varies per version of Python
... print "excepted"
>>> a = Exceptor()
>>> a.cast('fail')
>>> api.sleep(0)
excepted
The main purpose of excepted is to prevent the actor's coroutine
from dying.
>>> a.cast('message 2')
>>> api.sleep(0)
received message 2
If excepted() itself raises an exception, that will kill the coroutine.
>>> api.kill(a._killer) # test cleanup
"""
print "Exception in %s.received(): %s" % (
type(self).__name__, exc)
traceback.print_exc()
def _test():
print "Running doctests. There will be no further output if they succeed."

View File

@@ -181,8 +181,9 @@ class TestCoroutinePool(tests.TestCase):
class IncrActor(coros.Actor):
def received(self, message):
def received(self, evt):
self.value = getattr(self, 'value', 0) + 1
if evt: evt.send()
class TestActor(tests.TestCase):
mode = 'static'
@@ -196,18 +197,23 @@ class TestActor(tests.TestCase):
api.kill(self.actor._killer)
def test_cast(self):
self.actor.cast(1)
api.sleep(0)
evt = coros.event()
self.actor.cast(evt)
evt.wait()
evt.reset()
self.assertEqual(self.actor.value, 1)
self.actor.cast(1)
api.sleep(0)
self.actor.cast(evt)
evt.wait()
self.assertEqual(self.actor.value, 2)
def test_cast_multi_1(self):
# make sure that both messages make it in there
self.actor.cast(1)
self.actor.cast(1)
api.sleep(0)
evt = coros.event()
evt1 = coros.event()
self.actor.cast(evt)
self.actor.cast(evt1)
evt.wait()
evt1.wait()
self.assertEqual(self.actor.value, 2)
def test_cast_multi_2(self):
@@ -221,21 +227,24 @@ class TestActor(tests.TestCase):
# yields, eventually all messages are delivered
msgs = []
waiters = []
def received(message):
evt = coros.event()
waiters.append(evt)
def received( (message, evt) ):
api.sleep(0)
msgs.append(message)
evt.send()
self.actor.received = received
self.actor.cast(1)
waiters.append(coros.event())
self.actor.cast( (1, waiters[-1]))
api.sleep(0)
self.actor.cast(2)
self.actor.cast(3)
waiters.append(coros.event())
self.actor.cast( (2, waiters[-1]) )
waiters.append(coros.event())
self.actor.cast( (3, waiters[-1]) )
api.sleep(0)
self.actor.cast(4)
self.actor.cast(5)
waiters.append(coros.event())
self.actor.cast( (4, waiters[-1]) )
waiters.append(coros.event())
self.actor.cast( (5, waiters[-1]) )
for evt in waiters:
evt.wait()
self.assertEqual(msgs, [1,2,3,4,5])
@@ -243,20 +252,49 @@ class TestActor(tests.TestCase):
def test_raising_received(self):
msgs = []
def received(message):
def received( (message, evt) ):
evt.send()
if message == 'fail':
raise RuntimeError()
else:
msgs.append(message)
self.actor.received = received
self.actor.excepted = lambda x: None
self.actor.cast('fail')
api.sleep(0)
self.actor.cast('should_appear')
api.sleep(0)
evt = coros.event()
self.actor.cast( ('fail', evt) )
evt.wait()
evt.reset()
self.actor.cast( ('should_appear', evt) )
evt.wait()
self.assertEqual(['should_appear'], msgs)
def test_multiple(self):
self.actor = IncrActor(concurrency=2)
total = [0]
def received( (func, ev, value) ):
func()
total[0] += value
ev.send()
self.actor.received = received
def onemoment():
api.sleep(0.1)
evt = coros.event()
evt1 = coros.event()
self.actor.cast( (onemoment, evt, 1) )
self.actor.cast( (lambda: None, evt1, 2) )
evt1.wait()
self.assertEqual(total[0], 2)
# both coroutines should have been used
self.assertEqual(self.actor._pool.current_size, 2)
self.assertEqual(self.actor._pool.free(), 1)
evt.wait()
self.assertEqual(total[0], 3)
self.assertEqual(self.actor._pool.free(), 2)
if __name__ == '__main__':
tests.main()

View File

@@ -22,6 +22,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import collections
import os
import socket
@@ -57,7 +58,7 @@ class Pool(object):
self.max_size = max_size
self.current_size = 0
self.channel = channel.channel()
self.free_items = []
self.free_items = collections.deque()
for x in range(min_size):
self.current_size += 1
self.free_items.append(self.create())
@@ -66,7 +67,7 @@ class Pool(object):
"""Return an item from the pool, when one is available
"""
if self.free_items:
return self.free_items.pop(0)
return self.free_items.popleft()
if self.current_size < self.max_size:
self.current_size += 1
return self.create()