"""\ @file api.py @author Bob Ippolito Copyright (c) 2005-2006, Bob Ippolito Copyright (c) 2007, Linden Research, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import sys import socket import string import linecache import inspect import traceback from eventlet.support import greenlet from eventlet import tls __all__ = [ 'call_after', 'exc_after', 'getcurrent', 'get_default_hub', 'get_hub', 'GreenletExit', 'kill', 'sleep', 'spawn', 'spew', 'switch', 'ssl_listener', 'tcp_listener', 'tcp_server', 'trampoline', 'unspew', 'use_hub', 'with_timeout', ] class TimeoutError(Exception): """Exception raised if an asynchronous operation times out""" pass _threadlocal = tls.local() def tcp_listener(address, backlog=50): """ Listen on the given (ip, port) *address* with a TCP socket. Returns a socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. Generally, the returned socket will be passed to ``tcp_server()``, which accepts connections forever and spawns greenlets for each incoming connection. """ from eventlet import greenio, util socket = greenio.GreenSocket(util.tcp_socket()) util.socket_bind_and_listen(socket, address, backlog=backlog) return socket def ssl_listener(address, certificate, private_key): """Listen on the given (ip, port) *address* with a TCP socket that can do SSL. *certificate* and *private_key* should be the filenames of the appropriate certificate and private key files to use with the SSL socket. Returns a socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. Generally, the returned socket will be passed to ``tcp_server()``, which accepts connections forever and spawns greenlets for each incoming connection. """ from eventlet import util socket = util.wrap_ssl(util.tcp_socket(), certificate, private_key) util.socket_bind_and_listen(socket, address) socket.is_secure = True return socket def connect_tcp(address, localaddr=None): """ Create a TCP connection to address (host, port) and return the socket. Optionally, bind to localaddr (host, port) first. """ from eventlet import greenio, util desc = greenio.GreenSocket(util.tcp_socket()) if localaddr is not None: desc.bind(localaddr) desc.connect(address) return desc def tcp_server(listensocket, server, *args, **kw): """ Given a socket, accept connections forever, spawning greenlets and executing *server* for each new incoming connection. When *listensocket* is closed, the ``tcp_server()`` greenlet will end. listensocket The socket from which to accept connections. server The callable to call when a new connection is made. \*args The positional arguments to pass to *server*. \*\*kw The keyword arguments to pass to *server*. """ try: try: while True: spawn(server, listensocket.accept(), *args, **kw) except socket.error, e: # Broken pipe means it was shutdown if e[0] != 32: raise finally: listensocket.close() def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError): """Suspend the current coroutine until the given socket object or file descriptor is ready to *read*, ready to *write*, or the specified *timeout* elapses, depending on arguments specified. To wait for *fd* to be ready to read, pass *read* ``=True``; ready to write, pass *write* ``=True``. To specify a timeout, pass the *timeout* argument in seconds. If the specified *timeout* elapses before the socket is ready to read or write, *timeout_exc* will be raised instead of ``trampoline()`` returning normally. """ t = None hub = get_hub() current = greenlet.getcurrent() assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' fileno = getattr(fd, 'fileno', lambda: fd)() def _do_close(_d, error=None): if error is None: current.throw(socket.error(32, 'Broken pipe')) else: current.throw(getattr(error, 'value', error)) # convert to socket.error def _do_timeout(): current.throw(timeout_exc()) def cb(d): current.switch() # with TwistedHub, descriptor actually an object (socket_rwdescriptor) which stores # this callback. If this callback stores a reference to the socket instance (fd) # then descriptor has a reference to that instance. This makes socket not collected # after greenlet exit. Since nobody actually uses the results of this switch, I removed # fd from here. If it will be needed than an indirect reference which is discarded right # after the switch above should be used. if timeout is not None: t = hub.schedule_call(timeout, _do_timeout) try: descriptor = hub.add_descriptor(fileno, read and cb, write and cb, _do_close) try: return hub.switch() finally: hub.remove_descriptor(descriptor) finally: if t is not None: t.cancel() def get_fileno(obj): try: f = obj.fileno except AttributeError: assert isinstance(obj, (int, long)) return obj else: return f() def select(read_list, write_list, error_list, timeout=None): hub = get_hub() t = None current = greenlet.getcurrent() assert hub.greenlet is not current, 'do not call blocking functions from the mainloop' ds = {} for r in read_list: ds[get_fileno(r)] = {'read' : r} for w in write_list: ds.setdefault(get_fileno(w), {})['write'] = w for e in error_list: ds.setdefault(get_fileno(e), {})['error'] = e descriptors = [] def on_read(d): original = ds[get_fileno(d)]['read'] current.switch(([original], [], [])) def on_write(d): original = ds[get_fileno(d)]['write'] current.switch(([], [original], [])) def on_error(d, _err=None): original = ds[get_fileno(d)]['error'] current.switch(([], [], [original])) def on_timeout(): current.switch(([], [], [])) if timeout is not None: t = hub.schedule_call(timeout, on_timeout) try: for k, v in ds.iteritems(): d = hub.add_descriptor(k, v.get('read') is not None and on_read, v.get('write') is not None and on_write, v.get('error') is not None and on_error) descriptors.append(d) try: return hub.switch() finally: for d in descriptors: hub.remove_descriptor(d) finally: if t is not None: t.cancel() def _spawn_startup(cb, args, kw, cancel=None): try: greenlet.getcurrent().parent.switch() cancel = None finally: if cancel is not None: cancel() return cb(*args, **kw) def _spawn(g): g.parent = greenlet.getcurrent() g.switch() class CancellingTimersGreenlet(greenlet.greenlet): def __init__(self, run=None, parent=None, hub=None): self._run = run if parent is None: parent = greenlet.getcurrent() if hub is None: hub = get_hub() self.hub = hub greenlet.greenlet.__init__(self, None, parent) def run(self, *args, **kwargs): try: return self._run(*args, **kwargs) finally: self.hub.cancel_timers(self, quiet=True) def spawn(function, *args, **kwds): """Create a new coroutine, or cooperative thread of control, within which to execute *function*. The *function* will be called with the given *args* and keyword arguments *kwds* and will remain in control unless it cooperatively yields by calling a socket method or ``sleep()``. ``spawn()`` returns control to the caller immediately, and *function* will be called in a future main loop iteration. An uncaught exception in *function* or any child will terminate the new coroutine with a log message. """ # killable t = None g = CancellingTimersGreenlet(_spawn_startup) t = get_hub().schedule_call_global(0, _spawn, g) g.switch(function, args, kwds, t.cancel) return g def kill(g): get_hub().schedule_call(0, g.throw) if getcurrent() is not get_hub().greenlet: sleep(0) def call_after_global(seconds, function, *args, **kwds): """Schedule *function* to be called after *seconds* have elapsed. The function will be scheduled even if the current greenlet has exited. *seconds* may be specified as an integer, or a float if fractional seconds are desired. The *function* will be called with the given *args* and keyword arguments *kwds*, and will be executed within the main loop's coroutine. Its return value is discarded. Any uncaught exception will be logged. """ # cancellable def startup(): g = CancellingTimersGreenlet(_spawn_startup) g.switch(function, args, kwds) g.switch() t = get_hub().schedule_call_global(seconds, startup) return t def call_after_local(seconds, function, *args, **kwds): """Schedule *function* to be called after *seconds* have elapsed. The function will NOT be called if the current greenlet has exited. *seconds* may be specified as an integer, or a float if fractional seconds are desired. The *function* will be called with the given *args* and keyword arguments *kwds*, and will be executed within the main loop's coroutine. Its return value is discarded. Any uncaught exception will be logged. """ # cancellable def startup(): g = CancellingTimersGreenlet(_spawn_startup) g.switch(function, args, kwds) g.switch() t = get_hub().schedule_call_local(seconds, startup) return t # for compatibility with original eventlet API call_after = call_after_local class _SilentException(BaseException): pass class timeout: """ >>> from __future__ import with_statement >>> from eventlet.api import sleep >>> DELAY = 0.01 Nothing happens if with-block finishes before the timeout expires >>> with timeout(DELAY*2): ... sleep(DELAY) >>> sleep(DELAY*2) # check if timer was actually cancelled An exception will be raised if it's not >>> with timeout(DELAY): ... sleep(DELAY*2) Traceback (most recent call last): ... TimeoutError You can customize the exception raised: >>> with timeout(DELAY, IOError("Operation takes way too long")): ... sleep(DELAY*2) Traceback (most recent call last): ... IOError: Operation takes way too long It's possible to cancel the timer inside the block: >>> with timeout(DELAY) as timer: ... timer.cancel() ... sleep(DELAY*2) To silent the exception, pass None as second parameter. The with-block will be interrupted with _SilentException, but it won't be propogated outside. >>> DELAY=0.1 >>> import time >>> start = time.time() >>> with timeout(DELAY, None): ... sleep(DELAY*2) >>> (time.time()-start)