diff --git a/eventlet/coros.py b/eventlet/coros.py index 86b77ac..c8784be 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -22,6 +22,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import collections import time import traceback @@ -174,3 +175,78 @@ class pipe(object): buf, self._buffer = self._buffer[:num], self._buffer[num:] return buf + +class Actor(object): + """ A free-running coroutine that accepts and processes messages. + + Kind of the equivalent of an Erlang process, really. It processes + a queue of messages in the order that they were sent. You must + subclass this and implement your own version of receive(). + + The actor's reference count will never drop to zero while the + coroutine exists; if you lose all references to the actor object + it will never be freed. + """ + def __init__(self): + """ Constructs an Actor, kicking off a new coroutine to process the messages. """ + self._mailbox = collections.deque() + self._event = event() + self._killer = api.spawn(self.run_forever) + + def run_forever(self): + """ Loops forever, continually checking the mailbox. """ + while True: + if not self._mailbox: + self._event.wait() + self._event.reset() + else: + # leave the message in the mailbox until after it's + # been processed so the event doesn't get triggered + # while in the received method + self.received(self._mailbox[0]) + self._mailbox.popleft() + + def cast(self, message): + """ Send a message to the actor. + + If the actor is busy, the message will be enqueued for later + consumption. There is no return value. + + >>> a = Actor() + >>> a.received = lambda msg: msg + >>> a.cast("hello") + """ + self._mailbox.append(message) + # if this is the only message, the coro could be waiting + if len(self._mailbox) == 1: + self._event.send() + + def received(self, message): + """ Called to process each incoming message. + + The default implementation just raises an exception, so + replace it with something useful! + + >>> class Greeter(Actor): + ... def received(self, message): + ... print "received", message + ... + >>> a = Greeter() + >>> a.cast("message 1") + >>> api.sleep(0) # need to explicitly yield to cause the actor to run + received message 1 + >>> a.cast("message 2") + >>> a.cast("message 3") + >>> api.sleep(0) + received message 2 + received message 3 + """ + raise NotImplementedError() + +def _test(): + print "Running doctests. There will be no further output if they succeed." + import doctest + doctest.testmod() + +if __name__ == "__main__": + _test() diff --git a/eventlet/coros_test.py b/eventlet/coros_test.py index ee15dd0..0675c3e 100644 --- a/eventlet/coros_test.py +++ b/eventlet/coros_test.py @@ -152,5 +152,68 @@ class TestCoroutinePool(tests.TestCase): api.sleep(0) self.assertEquals(t.cancelled, True) +class IncrActor(coros.Actor): + def received(self, message): + self.value = getattr(self, 'value', 0) + 1 + +class TestActor(tests.TestCase): + mode = 'static' + def setUp(self): + # raise an exception if we're waiting forever + self._cancel_timeout = api.exc_after(1, RuntimeError()) + self.actor = IncrActor() + + def tearDown(self): + self._cancel_timeout.cancel() + api.kill(self.actor._killer) + + def test_cast(self): + self.actor.cast(1) + api.sleep(0) + self.assertEqual(self.actor.value, 1) + self.actor.cast(1) + api.sleep(0) + self.assertEqual(self.actor.value, 2) + + def test_cast_multi_1(self): + # make sure that both messages make it in there + self.actor.cast(1) + self.actor.cast(1) + api.sleep(0) + self.assertEqual(self.actor.value, 2) + + def test_cast_multi_2(self): + # the actor goes through a slightly different code path if it + # is forced to enter its event loop prior to any cast()s + api.sleep(0) + self.test_cast_multi_1() + + def test_sleeping_during_received(self): + # ensure that even if the received method cooperatively + # yields, eventually all messages are delivered + msgs = [] + waiters = [] + def received(message): + evt = coros.event() + waiters.append(evt) + api.sleep(0) + msgs.append(message) + evt.send() + self.actor.received = received + + self.actor.cast(1) + api.sleep(0) + self.actor.cast(2) + self.actor.cast(3) + api.sleep(0) + self.actor.cast(4) + self.actor.cast(5) + for evt in waiters: + evt.wait() + self.assertEqual(msgs, [1,2,3,4,5]) + + + + if __name__ == '__main__': tests.main()