623 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			623 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # @author Donovan Preston
 | |
| # 
 | |
| # Copyright (c) 2007, Linden Research, Inc.
 | |
| # Permission is hereby granted, free of charge, to any person obtaining a copy
 | |
| # of this software and associated documentation files (the "Software"), to deal
 | |
| # in the Software without restriction, including without limitation the rights
 | |
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 | |
| # copies of the Software, and to permit persons to whom the Software is
 | |
| # furnished to do so, subject to the following conditions:
 | |
| # 
 | |
| # The above copyright notice and this permission notice shall be included in
 | |
| # all copies or substantial portions of the Software.
 | |
| # 
 | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 | |
| # THE SOFTWARE.
 | |
| 
 | |
| import collections
 | |
| import time
 | |
| 
 | |
| from eventlet import api
 | |
| 
 | |
| 
 | |
| class Cancelled(RuntimeError):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class NOT_USED:
 | |
|     def __repr__(self):
 | |
|         return 'NOT_USED'
 | |
| 
 | |
| NOT_USED = NOT_USED()
 | |
| 
 | |
| class event(object):
 | |
|     """An abstraction where an arbitrary number of coroutines
 | |
|     can wait for one event from another.
 | |
| 
 | |
|     Events differ from channels in two ways:
 | |
|       1) calling send() does not unschedule the current coroutine
 | |
|       2) send() can only be called once; use reset() to prepare the event for
 | |
|          another send()
 | |
|     They are ideal for communicating return values between coroutines.
 | |
| 
 | |
|     >>> from eventlet import coros, api
 | |
|     >>> evt = coros.event()
 | |
|     >>> def baz(b):
 | |
|     ...     evt.send(b + 1)
 | |
|     ...
 | |
|     >>> _ = api.spawn(baz, 3)
 | |
|     >>> evt.wait()
 | |
|     4
 | |
|     """
 | |
|     _result = None
 | |
|     def __init__(self):
 | |
|         self._waiters = {}
 | |
|         self.reset()
 | |
| 
 | |
|     def __str__(self):
 | |
|         params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters))
 | |
|         return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
 | |
| 
 | |
|     def reset(self):
 | |
|         """ Reset this event so it can be used to send again.
 | |
|         Can only be called after send has been called.
 | |
| 
 | |
|         >>> from eventlet import coros
 | |
|         >>> evt = coros.event()
 | |
|         >>> evt.send(1)
 | |
|         >>> evt.reset()
 | |
|         >>> evt.send(2)
 | |
|         >>> evt.wait()
 | |
|         2
 | |
| 
 | |
|         Calling reset multiple times in a row is an error.
 | |
| 
 | |
|         >>> evt.reset()
 | |
|         >>> evt.reset()
 | |
|         Traceback (most recent call last):
 | |
|         ...
 | |
|         AssertionError: Trying to re-reset() a fresh event.
 | |
| 
 | |
|         """
 | |
|         assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
 | |
|         self.epoch = time.time()
 | |
|         self._result = NOT_USED
 | |
|         self._exc = None
 | |
| 
 | |
|     def ready(self):
 | |
|         """ Return true if the wait() call will return immediately.
 | |
|         Used to avoid waiting for things that might take a while to time out.
 | |
|         For example, you can put a bunch of events into a list, and then visit
 | |
|         them all repeatedly, calling ready() until one returns True, and then
 | |
|         you can wait() on that one."""
 | |
|         return self._result is not NOT_USED
 | |
| 
 | |
|     def has_exception(self):
 | |
|         return self._exc is not None
 | |
| 
 | |
|     def has_result(self):
 | |
|         return self._result is not NOT_USED and self._exc is None
 | |
| 
 | |
|     def poll(self, notready=None):
 | |
|         if self.ready():
 | |
|             return self.wait()
 | |
|         return notready
 | |
| 
 | |
|     # QQQ make it return tuple (type, value, tb) instead of raising
 | |
|     # because
 | |
|     # 1) "poll" does not imply raising
 | |
|     # 2) it's better not to screw up caller's sys.exc_info() by default
 | |
|     #    (e.g. if caller wants to calls the function in except or finally)
 | |
|     def poll_exception(self, notready=None):
 | |
|         if self.has_exception():
 | |
|             return self.wait()
 | |
|         return notready
 | |
| 
 | |
|     def poll_result(self, notready=None):
 | |
|         if self.has_result():
 | |
|             return self.wait()
 | |
|         return notready
 | |
| 
 | |
|     def wait(self):
 | |
|         """Wait until another coroutine calls send.
 | |
|         Returns the value the other coroutine passed to
 | |
|         send.
 | |
| 
 | |
|         >>> from eventlet import coros, api
 | |
|         >>> evt = coros.event()
 | |
|         >>> def wait_on():
 | |
|         ...    retval = evt.wait()
 | |
|         ...    print "waited for", retval
 | |
|         >>> _ = api.spawn(wait_on)
 | |
|         >>> evt.send('result')
 | |
|         >>> api.sleep(0)
 | |
|         waited for result
 | |
| 
 | |
|         Returns immediately if the event has already
 | |
|         occured.
 | |
| 
 | |
|         >>> evt.wait()
 | |
|         'result'
 | |
|         """
 | |
|         if self._result is NOT_USED:
 | |
|             self._waiters[api.getcurrent()] = True
 | |
|             try:
 | |
|                 return api.get_hub().switch()
 | |
|             finally:
 | |
|                 self._waiters.pop(api.getcurrent(), None)
 | |
|         if self._exc is not None:
 | |
|             api.getcurrent().throw(*self._exc)
 | |
|         return self._result
 | |
| 
 | |
|     def cancel(self, waiter):
 | |
|         """Raise an exception into a coroutine which called
 | |
|         wait() an this event instead of returning a value
 | |
|         from wait. Sends the eventlet.coros.Cancelled
 | |
|         exception
 | |
| 
 | |
|         waiter: The greenlet (greenlet.getcurrent()) of the
 | |
|             coroutine to cancel
 | |
| 
 | |
|         >>> from eventlet import coros, api
 | |
|         >>> evt = coros.event()
 | |
|         >>> def wait_on():
 | |
|         ...    try:
 | |
|         ...        print "received " + evt.wait()
 | |
|         ...    except coros.Cancelled, c:
 | |
|         ...        print "Cancelled"
 | |
|         ...
 | |
|         >>> waiter = api.spawn(wait_on)
 | |
| 
 | |
|         The cancel call works on coroutines that are in the wait() call.
 | |
| 
 | |
|         >>> api.sleep(0)  # enter the wait()
 | |
|         >>> evt.cancel(waiter)
 | |
|         >>> api.sleep(0)  # receive the exception
 | |
|         Cancelled
 | |
| 
 | |
|         The cancel is invisible to coroutines that call wait() after cancel()
 | |
|         is called.  This is different from send()'s behavior, where the result
 | |
|         is passed to any waiter regardless of the ordering of the calls.
 | |
| 
 | |
|         >>> waiter = api.spawn(wait_on)
 | |
|         >>> api.sleep(0)
 | |
| 
 | |
|         Cancels have no effect on the ability to send() to the event.
 | |
| 
 | |
|         >>> evt.send('stuff')
 | |
|         >>> api.sleep(0)
 | |
|         received stuff
 | |
|         """
 | |
|         if waiter in self._waiters:
 | |
|             del self._waiters[waiter]
 | |
|             # XXX This does not check that waiter still waits when throw actually happens
 | |
|             # XXX and therefore is broken (see how send() deals with this)
 | |
|             api.get_hub().schedule_call(0, waiter.throw, Cancelled())
 | |
| 
 | |
|     def send(self, result=None, exc=None):
 | |
|         """Makes arrangements for the waiters to be woken with the
 | |
|         result and then returns immediately to the parent.
 | |
| 
 | |
|         >>> from eventlet import coros, api
 | |
|         >>> evt = coros.event()
 | |
|         >>> def waiter():
 | |
|         ...     print 'about to wait'
 | |
|         ...     result = evt.wait()
 | |
|         ...     print 'waited for', result
 | |
|         >>> _ = api.spawn(waiter)
 | |
|         >>> api.sleep(0)
 | |
|         about to wait
 | |
|         >>> evt.send('a')
 | |
|         >>> api.sleep(0)
 | |
|         waited for a
 | |
| 
 | |
|         It is an error to call send() multiple times on the same event.
 | |
| 
 | |
|         >>> evt.send('whoops')
 | |
|         Traceback (most recent call last):
 | |
|         ...
 | |
|         AssertionError: Trying to re-send() an already-triggered event.
 | |
| 
 | |
|         Use reset() between send()s to reuse an event object.
 | |
|         """
 | |
|         assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
 | |
|         self._result = result
 | |
|         if exc is not None and not isinstance(exc, tuple):
 | |
|             exc = (exc, )
 | |
|         self._exc = exc
 | |
|         hub = api.get_hub()
 | |
|         if self._waiters:
 | |
|             hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.keys())
 | |
| 
 | |
|     def _do_send(self, result, exc, waiters):
 | |
|         while waiters:
 | |
|             waiter = waiters.pop()
 | |
|             if waiter in self._waiters:
 | |
|                 if exc is None:
 | |
|                     waiter.switch(result)
 | |
|                 else:
 | |
|                     waiter.throw(*exc)
 | |
| 
 | |
|     def send_exception(self, *args):
 | |
|         # the arguments and the same as for greenlet.throw
 | |
|         return self.send(None, args)
 | |
| 
 | |
| 
 | |
| class Semaphore(object):
 | |
|     """An unbounded semaphore.
 | |
|     Optionally initialize with a resource count, then acquire() and release()
 | |
|     resources as needed. Attempting to acquire() when count is zero suspends
 | |
|     the calling coroutine until count becomes nonzero again.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, count=0):
 | |
|         self.counter  = count
 | |
|         self._waiters = {}
 | |
| 
 | |
|     def __str__(self):
 | |
|         params = (self.__class__.__name__, hex(id(self)), self.counter)
 | |
|         return '<%s at %s counter=%r>' % params
 | |
| 
 | |
|     def locked(self):
 | |
|         return self.counter <= 0
 | |
| 
 | |
|     def bounded(self):
 | |
|         # for consistency with BoundedSemaphore
 | |
|         return False
 | |
| 
 | |
|     def acquire(self, blocking=True):
 | |
|         if not blocking and self.locked():
 | |
|             return False
 | |
|         while self.counter<=0:
 | |
|             self._waiters[api.getcurrent()] = None
 | |
|             try:
 | |
|                 api.get_hub().switch()
 | |
|             finally:
 | |
|                 self._waiters.pop(api.getcurrent(), None)
 | |
|         self.counter -= 1
 | |
|         return True
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self.acquire()
 | |
| 
 | |
|     def release(self, blocking=True):
 | |
|         # `blocking' parameter is for consistency with BoundedSemaphore and is ignored
 | |
|         self.counter += 1
 | |
|         if self._waiters:
 | |
|             api.get_hub().schedule_call_global(0, self._do_acquire)
 | |
|         return True
 | |
| 
 | |
|     def _do_acquire(self):
 | |
|         if self._waiters and self.counter>0:
 | |
|             waiter, _unused = self._waiters.popitem()
 | |
|             waiter.switch()
 | |
| 
 | |
|     def __exit__(self, typ, val, tb):
 | |
|         self.release()
 | |
| 
 | |
| 
 | |
| class BoundedSemaphore(object):
 | |
|     """A bounded semaphore.
 | |
|     Optionally initialize with a resource count, then acquire() and release()
 | |
|     resources as needed. Attempting to acquire() when count is zero suspends
 | |
|     the calling coroutine until count becomes nonzero again.  Attempting to
 | |
|     release() after count has reached limit suspends the calling coroutine until
 | |
|     count becomes less than limit again.
 | |
|     """
 | |
|     def __init__(self, count, limit):
 | |
|         if count > limit:
 | |
|             # accidentally, this also catches the case when limit is None
 | |
|             raise ValueError("'count' cannot be more than 'limit'")
 | |
|         self.lower_bound = Semaphore(count)
 | |
|         self.upper_bound = Semaphore(limit-count)
 | |
| 
 | |
|     def __str__(self):
 | |
|         params = (self.__class__.__name__, hex(id(self)), self.lower_bound.counter, self.upper_bound.counter)
 | |
|         return '<%s at %s %r/%r>' % params
 | |
| 
 | |
|     def locked(self):
 | |
|         return self.lower_bound.locked()
 | |
| 
 | |
|     def bounded(self):
 | |
|         return self.upper_bound.locked()
 | |
| 
 | |
|     def acquire(self, blocking=True):
 | |
|         if not blocking and self.locked():
 | |
|             return False
 | |
|         self.upper_bound.release()
 | |
|         try:
 | |
|             return self.lower_bound.acquire()
 | |
|         except:
 | |
|             self.upper_bound.counter -= 1
 | |
|             # using counter directly means that it can be less than zero.
 | |
|             # however I certainly don't need to wait here and I don't seem to have
 | |
|             # a need to care about such inconsistency
 | |
|             raise
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self.acquire()
 | |
| 
 | |
|     def release(self, blocking=True):
 | |
|         if not blocking and self.bounded():
 | |
|             return False
 | |
|         self.lower_bound.release()
 | |
|         try:
 | |
|             return self.upper_bound.acquire()
 | |
|         except:
 | |
|             self.lower_bound.counter -= 1
 | |
|             raise
 | |
| 
 | |
|     def __exit__(self, typ, val, tb):
 | |
|         self.release()
 | |
| 
 | |
|     @property
 | |
|     def balance(self):
 | |
|         return self.lower_bound.counter - self.upper_bound.counter
 | |
| 
 | |
| 
 | |
| def semaphore(count=0, limit=None):
 | |
|     if limit is None:
 | |
|         return Semaphore(count)
 | |
|     else:
 | |
|         return BoundedSemaphore(count, limit)
 | |
| 
 | |
| 
 | |
| class metaphore(object):
 | |
|     """This is sort of an inverse semaphore: a counter that starts at 0 and
 | |
|     waits only if nonzero. It's used to implement a "wait for all" scenario.
 | |
| 
 | |
|     >>> from eventlet import api, coros
 | |
|     >>> count = coros.metaphore()
 | |
|     >>> count.wait()
 | |
|     >>> def decrementer(count, id):
 | |
|     ...     print "%s decrementing" % id
 | |
|     ...     count.dec()
 | |
|     ...
 | |
|     >>> _ = api.spawn(decrementer, count, 'A')
 | |
|     >>> _ = api.spawn(decrementer, count, 'B')
 | |
|     >>> count.inc(2)
 | |
|     >>> count.wait()
 | |
|     A decrementing
 | |
|     B decrementing
 | |
|     """
 | |
|     def __init__(self):
 | |
|         self.counter = 0
 | |
|         self.event   = event()
 | |
|         # send() right away, else we'd wait on the default 0 count!
 | |
|         self.event.send()
 | |
| 
 | |
|     def inc(self, by=1):
 | |
|         """Increment our counter. If this transitions the counter from zero to
 | |
|         nonzero, make any subsequent wait() call wait.
 | |
|         """
 | |
|         assert by > 0
 | |
|         self.counter += by
 | |
|         if self.counter == by:
 | |
|             # If we just incremented self.counter by 'by', and the new count
 | |
|             # equals 'by', then the old value of self.counter was 0.
 | |
|             # Transitioning from 0 to a nonzero value means wait() must
 | |
|             # actually wait.
 | |
|             self.event.reset()
 | |
| 
 | |
|     def dec(self, by=1):
 | |
|         """Decrement our counter. If this transitions the counter from nonzero
 | |
|         to zero, a current or subsequent wait() call need no longer wait.
 | |
|         """
 | |
|         assert by > 0
 | |
|         self.counter -= by
 | |
|         if self.counter <= 0:
 | |
|             # Don't leave self.counter < 0, that will screw things up in
 | |
|             # future calls.
 | |
|             self.counter = 0
 | |
|             # Transitioning from nonzero to 0 means wait() need no longer wait.
 | |
|             self.event.send()
 | |
| 
 | |
|     def wait(self):
 | |
|         """Suspend the caller only if our count is nonzero. In that case,
 | |
|         resume the caller once the count decrements to zero again.
 | |
|         """
 | |
|         self.event.wait()
 | |
| 
 | |
| def execute(func, *args, **kw):
 | |
|     """ Executes an operation asynchronously in a new coroutine, returning
 | |
|     an event to retrieve the return value.
 | |
| 
 | |
|     This has the same api as the CoroutinePool.execute method; the only
 | |
|     difference is that this one creates a new coroutine instead of drawing
 | |
|     from a pool.
 | |
| 
 | |
|     >>> from eventlet import coros
 | |
|     >>> evt = coros.execute(lambda a: ('foo', a), 1)
 | |
|     >>> evt.wait()
 | |
|     ('foo', 1)
 | |
|     """
 | |
|     evt = event()
 | |
|     def _really_execute():
 | |
|         evt.send(func(*args, **kw))
 | |
|     api.spawn(_really_execute)
 | |
|     return evt
 | |
| 
 | |
| 
 | |
| def CoroutinePool(*args, **kwargs):
 | |
|     from eventlet.pool import Pool
 | |
|     return Pool(*args, **kwargs)
 | |
| 
 | |
| 
 | |
| class queue(object):
 | |
|     """Cross-coroutine queue, using semaphore to synchronize.
 | |
|     The API is like a generalization of event to be able to hold more than one
 | |
|     item at a time (without reset() or cancel()).
 | |
| 
 | |
|     >>> from eventlet import coros
 | |
|     >>> q = coros.queue(max_size=2)
 | |
|     >>> def putter(q):
 | |
|     ...     q.send("first")
 | |
|     ...
 | |
|     >>> _ = api.spawn(putter, q)
 | |
|     >>> q.ready()
 | |
|     False
 | |
|     >>> q.wait()
 | |
|     'first'
 | |
|     >>> q.ready()
 | |
|     False
 | |
|     >>> q.send("second")
 | |
|     >>> q.ready()
 | |
|     True
 | |
|     >>> q.send("third")
 | |
|     >>> def getter(q):
 | |
|     ...     print q.wait()
 | |
|     ...
 | |
|     >>> _ = api.spawn(getter, q)
 | |
|     >>> q.send("fourth")
 | |
|     second
 | |
|     """
 | |
|     def __init__(self, max_size=None):
 | |
|         """If you omit max_size, the queue will attempt to store an unlimited
 | |
|         number of items.
 | |
|         Specifying max_size means that when the queue already contains
 | |
|         max_size items, an attempt to send() one more item will suspend the
 | |
|         calling coroutine until someone else retrieves one.
 | |
|         """
 | |
|         self.items = collections.deque()
 | |
|         self.sem = semaphore(count=0, limit=max_size)
 | |
| 
 | |
|     def __nonzero__(self):
 | |
|         return len(self.items)>0
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self.items)
 | |
| 
 | |
|     def __str__(self):
 | |
|         params = (self.__class__.__name__, hex(id(self)), self.sem, len(self.items))
 | |
|         return '<%s at %s sem=%s items[%d]>' % params
 | |
| 
 | |
|     def send(self, result=None, exc=None):
 | |
|         """If you send(exc=SomeExceptionClass), the corresponding wait() call
 | |
|         will raise that exception.
 | |
|         Otherwise, the corresponding wait() will return result (default None).
 | |
|         """
 | |
|         if exc is not None and not isinstance(exc, tuple):
 | |
|             exc = (exc, )
 | |
|         self.items.append((result, exc))
 | |
|         self.sem.release()
 | |
| 
 | |
|     def send_exception(self, *args):
 | |
|         # the arguments are the same as for greenlet.throw
 | |
|         return self.send(exc=args)
 | |
| 
 | |
|     def wait(self):
 | |
|         """Wait for an item sent by a send() call, in FIFO order.
 | |
|         If the corresponding send() specifies exc=SomeExceptionClass, this
 | |
|         wait() will raise that exception.
 | |
|         Otherwise, this wait() will return the corresponding send() call's
 | |
|         result= parameter.
 | |
|         """
 | |
|         self.sem.acquire()
 | |
|         result, exc = self.items.popleft()
 | |
|         if exc is not None:
 | |
|             api.getcurrent().throw(*exc)
 | |
|         return result
 | |
| 
 | |
|     def ready(self):
 | |
|         # could also base this on self.sem.counter...
 | |
|         return len(self.items) > 0
 | |
| 
 | |
| 
 | |
| class Actor(object):
 | |
|     """ A free-running coroutine that accepts and processes messages.
 | |
| 
 | |
|     Kind of the equivalent of an Erlang process, really.  It processes
 | |
|     a queue of messages in the order that they were sent.  You must
 | |
|     subclass this and implement your own version of receive().
 | |
| 
 | |
|     The actor's reference count will never drop to zero while the
 | |
|     coroutine exists; if you lose all references to the actor object
 | |
|     it will never be freed.
 | |
|     """
 | |
|     def __init__(self, concurrency = 1):
 | |
|         """ Constructs an Actor, kicking off a new coroutine to process the messages.
 | |
| 
 | |
|         The concurrency argument specifies how many messages the actor will try
 | |
|         to process concurrently.  If it is 1, the actor will process messages
 | |
|         serially.
 | |
|         """
 | |
|         self._mailbox = collections.deque()
 | |
|         self._event = event()
 | |
|         self._killer = api.spawn(self.run_forever)
 | |
|         self._pool = CoroutinePool(min_size=0, max_size=concurrency)
 | |
| 
 | |
|     def run_forever(self):
 | |
|         """ Loops forever, continually checking the mailbox. """
 | |
|         while True:
 | |
|             if not self._mailbox:
 | |
|                 self._event.wait()
 | |
|                 self._event = event()
 | |
|             else:
 | |
|                 # leave the message in the mailbox until after it's
 | |
|                 # been processed so the event doesn't get triggered
 | |
|                 # while in the received method
 | |
|                 self._pool.execute_async(
 | |
|                     self.received, self._mailbox[0])
 | |
|                 self._mailbox.popleft()
 | |
| 
 | |
|     def cast(self, message):
 | |
|         """ Send a message to the actor.
 | |
| 
 | |
|         If the actor is busy, the message will be enqueued for later
 | |
|         consumption.  There is no return value.
 | |
| 
 | |
|         >>> a = Actor()
 | |
|         >>> a.received = lambda msg: msg
 | |
|         >>> a.cast("hello")
 | |
|         """
 | |
|         self._mailbox.append(message)
 | |
|         # if this is the only message, the coro could be waiting
 | |
|         if len(self._mailbox) == 1:
 | |
|             self._event.send()
 | |
| 
 | |
|     def received(self, message):
 | |
|         """ Called to process each incoming message.
 | |
| 
 | |
|         The default implementation just raises an exception, so
 | |
|         replace it with something useful!
 | |
| 
 | |
|         >>> class Greeter(Actor):
 | |
|         ...     def received(self, (message, evt) ):
 | |
|         ...         print "received", message
 | |
|         ...         if evt: evt.send()
 | |
|         ...
 | |
|         >>> a = Greeter()
 | |
| 
 | |
|         This example uses events to synchronize between the actor and the main
 | |
|         coroutine in a predictable manner, but this kinda defeats the point of
 | |
|         the Actor, so don't do it in a real application.
 | |
| 
 | |
|         >>> evt = event()
 | |
|         >>> a.cast( ("message 1", evt) )
 | |
|         >>> evt.wait()  # force it to run at this exact moment
 | |
|         received message 1
 | |
|         >>> evt.reset()
 | |
|         >>> a.cast( ("message 2", None) )
 | |
|         >>> a.cast( ("message 3", evt) )
 | |
|         >>> evt.wait()
 | |
|         received message 2
 | |
|         received message 3
 | |
| 
 | |
|         >>> api.kill(a._killer)   # test cleanup
 | |
|         """
 | |
|         raise NotImplementedError()
 | |
| 
 | |
| 
 | |
| def _test():
 | |
|     print "Running doctests.  There will be no further output if they succeed."
 | |
|     import doctest
 | |
|     doctest.testmod()
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     _test()
 | 
