[svn r78] Added actor class (a free-running message receiver as discussed here: http://lists.secondlife.com/pipermail/chttpdev/2007-December/000042.html) and some tests.
This commit is contained in:
@@ -22,6 +22,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|||||||
THE SOFTWARE.
|
THE SOFTWARE.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import collections
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
@@ -174,3 +175,78 @@ class pipe(object):
|
|||||||
buf, self._buffer = self._buffer[:num], self._buffer[num:]
|
buf, self._buffer = self._buffer[:num], self._buffer[num:]
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
class Actor(object):
|
||||||
|
""" A free-running coroutine that accepts and processes messages.
|
||||||
|
|
||||||
|
Kind of the equivalent of an Erlang process, really. It processes
|
||||||
|
a queue of messages in the order that they were sent. You must
|
||||||
|
subclass this and implement your own version of receive().
|
||||||
|
|
||||||
|
The actor's reference count will never drop to zero while the
|
||||||
|
coroutine exists; if you lose all references to the actor object
|
||||||
|
it will never be freed.
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
""" Constructs an Actor, kicking off a new coroutine to process the messages. """
|
||||||
|
self._mailbox = collections.deque()
|
||||||
|
self._event = event()
|
||||||
|
self._killer = api.spawn(self.run_forever)
|
||||||
|
|
||||||
|
def run_forever(self):
|
||||||
|
""" Loops forever, continually checking the mailbox. """
|
||||||
|
while True:
|
||||||
|
if not self._mailbox:
|
||||||
|
self._event.wait()
|
||||||
|
self._event.reset()
|
||||||
|
else:
|
||||||
|
# leave the message in the mailbox until after it's
|
||||||
|
# been processed so the event doesn't get triggered
|
||||||
|
# while in the received method
|
||||||
|
self.received(self._mailbox[0])
|
||||||
|
self._mailbox.popleft()
|
||||||
|
|
||||||
|
def cast(self, message):
|
||||||
|
""" Send a message to the actor.
|
||||||
|
|
||||||
|
If the actor is busy, the message will be enqueued for later
|
||||||
|
consumption. There is no return value.
|
||||||
|
|
||||||
|
>>> a = Actor()
|
||||||
|
>>> a.received = lambda msg: msg
|
||||||
|
>>> a.cast("hello")
|
||||||
|
"""
|
||||||
|
self._mailbox.append(message)
|
||||||
|
# if this is the only message, the coro could be waiting
|
||||||
|
if len(self._mailbox) == 1:
|
||||||
|
self._event.send()
|
||||||
|
|
||||||
|
def received(self, message):
|
||||||
|
""" Called to process each incoming message.
|
||||||
|
|
||||||
|
The default implementation just raises an exception, so
|
||||||
|
replace it with something useful!
|
||||||
|
|
||||||
|
>>> class Greeter(Actor):
|
||||||
|
... def received(self, message):
|
||||||
|
... print "received", message
|
||||||
|
...
|
||||||
|
>>> a = Greeter()
|
||||||
|
>>> a.cast("message 1")
|
||||||
|
>>> api.sleep(0) # need to explicitly yield to cause the actor to run
|
||||||
|
received message 1
|
||||||
|
>>> a.cast("message 2")
|
||||||
|
>>> a.cast("message 3")
|
||||||
|
>>> api.sleep(0)
|
||||||
|
received message 2
|
||||||
|
received message 3
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def _test():
|
||||||
|
print "Running doctests. There will be no further output if they succeed."
|
||||||
|
import doctest
|
||||||
|
doctest.testmod()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
_test()
|
||||||
|
@@ -152,5 +152,68 @@ class TestCoroutinePool(tests.TestCase):
|
|||||||
api.sleep(0)
|
api.sleep(0)
|
||||||
self.assertEquals(t.cancelled, True)
|
self.assertEquals(t.cancelled, True)
|
||||||
|
|
||||||
|
class IncrActor(coros.Actor):
|
||||||
|
def received(self, message):
|
||||||
|
self.value = getattr(self, 'value', 0) + 1
|
||||||
|
|
||||||
|
class TestActor(tests.TestCase):
|
||||||
|
mode = 'static'
|
||||||
|
def setUp(self):
|
||||||
|
# raise an exception if we're waiting forever
|
||||||
|
self._cancel_timeout = api.exc_after(1, RuntimeError())
|
||||||
|
self.actor = IncrActor()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self._cancel_timeout.cancel()
|
||||||
|
api.kill(self.actor._killer)
|
||||||
|
|
||||||
|
def test_cast(self):
|
||||||
|
self.actor.cast(1)
|
||||||
|
api.sleep(0)
|
||||||
|
self.assertEqual(self.actor.value, 1)
|
||||||
|
self.actor.cast(1)
|
||||||
|
api.sleep(0)
|
||||||
|
self.assertEqual(self.actor.value, 2)
|
||||||
|
|
||||||
|
def test_cast_multi_1(self):
|
||||||
|
# make sure that both messages make it in there
|
||||||
|
self.actor.cast(1)
|
||||||
|
self.actor.cast(1)
|
||||||
|
api.sleep(0)
|
||||||
|
self.assertEqual(self.actor.value, 2)
|
||||||
|
|
||||||
|
def test_cast_multi_2(self):
|
||||||
|
# the actor goes through a slightly different code path if it
|
||||||
|
# is forced to enter its event loop prior to any cast()s
|
||||||
|
api.sleep(0)
|
||||||
|
self.test_cast_multi_1()
|
||||||
|
|
||||||
|
def test_sleeping_during_received(self):
|
||||||
|
# ensure that even if the received method cooperatively
|
||||||
|
# yields, eventually all messages are delivered
|
||||||
|
msgs = []
|
||||||
|
waiters = []
|
||||||
|
def received(message):
|
||||||
|
evt = coros.event()
|
||||||
|
waiters.append(evt)
|
||||||
|
api.sleep(0)
|
||||||
|
msgs.append(message)
|
||||||
|
evt.send()
|
||||||
|
self.actor.received = received
|
||||||
|
|
||||||
|
self.actor.cast(1)
|
||||||
|
api.sleep(0)
|
||||||
|
self.actor.cast(2)
|
||||||
|
self.actor.cast(3)
|
||||||
|
api.sleep(0)
|
||||||
|
self.actor.cast(4)
|
||||||
|
self.actor.cast(5)
|
||||||
|
for evt in waiters:
|
||||||
|
evt.wait()
|
||||||
|
self.assertEqual(msgs, [1,2,3,4,5])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
tests.main()
|
tests.main()
|
||||||
|
Reference in New Issue
Block a user