This commit is contained in:
Denis Bilenko
2009-01-13 00:23:37 +06:00
41 changed files with 2174 additions and 1221 deletions

View File

@@ -15,9 +15,9 @@ Eventlet on top of twisted provides:
Eventlet features: Eventlet features:
* utilities for spawning and controlling greenlet execution: * utilities for spawning and controlling greenlet execution:
api.spawn, api.kill, coros.Job api.spawn, api.kill, proc module
* utilities for communicating between greenlets: * utilities for communicating between greenlets:
coros.event, coros.queue coros.event, coros.queue, proc module
* standard Python modules that won't block the reactor: * standard Python modules that won't block the reactor:
eventlet.green package eventlet.green package
* utilities specific to twisted hub: * utilities specific to twisted hub:
@@ -43,8 +43,8 @@ from eventlet.twistedutil import join_reactor
then start the reactor as you would do in a regular twisted application. then start the reactor as you would do in a regular twisted application.
For (2) just make sure that you have reactor installed before using For (2) just make sure that you have reactor installed before using
any of eventlet functions. Otherwise a on-twisted hub select will be any of eventlet functions. Otherwise a non-twisted hub will be selected
selected and twisted code won't work. and twisted code won't work.
Most of examples/twisted_* use twisted style with the exception of Most of examples/twisted_* use twisted style with the exception of
twisted_client.py and twisted_srvconnector.py. All of the non-twisted twisted_client.py and twisted_srvconnector.py. All of the non-twisted
@@ -58,11 +58,18 @@ callback, calling only a non-blocking subset of eventlet API here. The
following functions won't unschedule the current greenlet and are safe following functions won't unschedule the current greenlet and are safe
to call from anywhere: to call from anywhere:
1. Greenlet creation functions: api.spawn, coros.Job*.spawn_new, 1. Greenlet creation functions: api.spawn, proc.spawn,
twistedutil.deferToGreenThread and others based on api.spawn. twistedutil.deferToGreenThread and others based on api.spawn.
2. send(), send_exception(), poll(), ready() methods of coros.event, 2. send(), send_exception(), poll(), ready() methods of coros.event
coros.Job and _unbounded_ coros.queue. and _unbounded_ coros.queue.
3. wait(timeout=0) is identical to poll(). Currently only Proc.wait
supports timeout parameter.
4. Proc.link/link_value/link_exception
Other classes that use these names should follow the convention.
For an example on how to take advantage of eventlet in a twisted For an example on how to take advantage of eventlet in a twisted
application using deferToGreenThread see examples/twisted_http_proxy.py application using deferToGreenThread see examples/twisted_http_proxy.py
@@ -70,8 +77,8 @@ application using deferToGreenThread see examples/twisted_http_proxy.py
Although eventlet provides eventlet.green.socket module that implements Although eventlet provides eventlet.green.socket module that implements
interface of the standard Python socket, there's also a way to use twisted's interface of the standard Python socket, there's also a way to use twisted's
network code in a synchronous fashion via GreenTransport class. network code in a synchronous fashion via GreenTransport class.
A GreenTransport interface is reminiscent of socket although it's not a drop A GreenTransport interface is reminiscent of socket although it's not a drop-in
in replacement. It combines features of TCPTransport and Protocol in a single replacement. It combines features of TCPTransport and Protocol in a single
object: object:
* all of transport methods (like getPeer()) are available directly on * all of transport methods (like getPeer()) are available directly on
@@ -81,9 +88,8 @@ object:
* read() and recv() methods are provided to retrieve the data from protocol * read() and recv() methods are provided to retrieve the data from protocol
synchronously. synchronously.
To make a GreenTransport instance you can use To make a GreenTransport instance use twistedutil.protocol.GreenClientCreator
twistedutil.protocol.GreenTransportCreator (usage is similar to that of (usage is similar to that of twisted.internet.protocol.ClientCreator)
twisted.internet.protocol.ClientCreator)
For an example on how to get a connected GreenTransport instance, For an example on how to get a connected GreenTransport instance,
see twisted_client.py, twisted_srvconnect.py or twisted_portforward.py. see twisted_client.py, twisted_srvconnect.py or twisted_portforward.py.
@@ -117,11 +123,13 @@ Essential points
rejected with ValueError rejected with ValueError
greenlet == coroutine == green thread == microthread in this document Note, that there's no scheduler of any sort; if a coroutine wants to be
scheduled again it must take care of it itself. As an application developer,
Note, that there's no scheduler of any sort; if a coroutine wants to be scheduled again however, you don't need to worry about it as that's what eventlet does behind
it must take care of it itself. As an application developer, however, you don't need the scenes. The cost of that is that you should not use greenlet's switch() and
to worry about it as that's what eventlet does behind the scenes. throw() methods, they will likely leave the current greenlet unscheduled
forever. Eventlet also takes advantage of greenlet's `parent' attribute,
so you should not meddle with it either.
How does eventlet work How does eventlet work
@@ -137,12 +145,13 @@ When twisted calls user's callback it's expected to return almost immediately,
without any blocking I/O calls. Deferreds help there. without any blocking I/O calls. Deferreds help there.
Eventlet runs the main loop in a dedicated greenlet (MAIN_LOOP). It is the same Eventlet runs the main loop in a dedicated greenlet (MAIN_LOOP). It is the same
greenlet as MAIN if you use join_reactor. Otherwise it's a dedicated greenlet greenlet as MAIN if you use join_reactor. Otherwise it's a separate greenlet
started implicitly. The execution is organized in a such way that the switching started implicitly. The execution is organized in a such way that the switching
almost always involves MAIN_LOOP. All of the blocking use this algorithm: always involves MAIN_LOOP. All of functions in eventlet that appear "blocking"
use the following algorithm:
1. register a callback that switches back to the current greenlet when 1. register a callback that switches back to the current greenlet when
an event of interest happen an event of interest happens
2. switch to the MAIN_LOOP 2. switch to the MAIN_LOOP
For example, here's what eventlet's socket recv() does: For example, here's what eventlet's socket recv() does:

View File

@@ -45,6 +45,7 @@ def switch(coro, result=None, exc=None):
return coro.throw(exc) return coro.throw(exc)
return coro.switch(result) return coro.switch(result)
Greenlet = greenlet.greenlet
class TimeoutError(Exception): class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out""" """Exception raised if an asynchronous operation times out"""
@@ -244,24 +245,6 @@ def _spawn(g):
g.switch() g.switch()
class CancellingTimersGreenlet(greenlet.greenlet):
def __init__(self, run=None, parent=None, hub=None):
self._run = run
if parent is None:
parent = greenlet.getcurrent()
if hub is None:
hub = get_hub()
self.hub = hub
greenlet.greenlet.__init__(self, None, parent)
def run(self, *args, **kwargs):
try:
return self._run(*args, **kwargs)
finally:
self.hub.cancel_timers(self, quiet=True)
def spawn(function, *args, **kwds): def spawn(function, *args, **kwds):
"""Create a new coroutine, or cooperative thread of control, within which """Create a new coroutine, or cooperative thread of control, within which
to execute *function*. to execute *function*.
@@ -278,7 +261,7 @@ def spawn(function, *args, **kwds):
""" """
# killable # killable
t = None t = None
g = CancellingTimersGreenlet(_spawn_startup) g = Greenlet(_spawn_startup)
t = get_hub().schedule_call_global(0, _spawn, g) t = get_hub().schedule_call_global(0, _spawn, g)
g.switch(function, args, kwds, t.cancel) g.switch(function, args, kwds, t.cancel)
return g return g
@@ -301,7 +284,7 @@ def call_after_global(seconds, function, *args, **kwds):
""" """
# cancellable # cancellable
def startup(): def startup():
g = CancellingTimersGreenlet(_spawn_startup) g = Greenlet(_spawn_startup)
g.switch(function, args, kwds) g.switch(function, args, kwds)
g.switch() g.switch()
t = get_hub().schedule_call_global(seconds, startup) t = get_hub().schedule_call_global(seconds, startup)
@@ -320,7 +303,7 @@ def call_after_local(seconds, function, *args, **kwds):
""" """
# cancellable # cancellable
def startup(): def startup():
g = CancellingTimersGreenlet(_spawn_startup) g = Greenlet(_spawn_startup)
g.switch(function, args, kwds) g.switch(function, args, kwds)
g.switch() g.switch()
t = get_hub().schedule_call_local(seconds, startup) t = get_hub().schedule_call_local(seconds, startup)
@@ -329,7 +312,7 @@ def call_after_local(seconds, function, *args, **kwds):
# for compatibility with original eventlet API # for compatibility with original eventlet API
call_after = call_after_local call_after = call_after_local
class _SilentException(BaseException): class _SilentException(Exception):
pass pass
class FakeTimer: class FakeTimer:
@@ -486,7 +469,7 @@ def use_hub(mod=None):
if hasattr(_threadlocal, 'hub'): if hasattr(_threadlocal, 'hub'):
del _threadlocal.hub del _threadlocal.hub
if isinstance(mod, str): if isinstance(mod, str):
mod = __import__('eventlet.hubs.' + mod, fromlist=['Hub']) mod = __import__('eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
if hasattr(mod, 'Hub'): if hasattr(mod, 'Hub'):
_threadlocal.Hub = mod.Hub _threadlocal.Hub = mod.Hub
else: else:

View File

@@ -129,6 +129,11 @@ class event(object):
return self.wait() return self.wait()
return notready return notready
# QQQ make it return tuple (type, value, tb) instead of raising
# because
# 1) "poll" does not imply raising
# 2) it's better not to screw up caller's sys.exc_info() by default
# (e.g. if caller wants to calls the function in except or finally)
def poll_exception(self, notready=None): def poll_exception(self, notready=None):
if self.has_exception(): if self.has_exception():
return self.wait() return self.wait()
@@ -209,8 +214,11 @@ class event(object):
>>> api.sleep(0) >>> api.sleep(0)
received stuff received stuff
""" """
# why is waiter not used?
if waiter in self._waiters: if waiter in self._waiters:
del self._waiters[waiter] del self._waiters[waiter]
# XXX This does not check that waiter still waits when throw actually happens
# XXX and therefore is broken (see how send() deals with this)
api.get_hub().schedule_call( api.get_hub().schedule_call(
0, waiter.throw, Cancelled()) 0, waiter.throw, Cancelled())
@@ -253,239 +261,22 @@ class event(object):
while waiters: while waiters:
waiter = waiters.pop() waiter = waiters.pop()
if waiter in self._waiters: if waiter in self._waiters:
if waiters:
api.get_hub().schedule_call_global(0, self._do_send, result, exc, waiters)
if exc is None: if exc is None:
waiter.switch(result) waiter.switch(result)
else: else:
waiter.throw(*exc) waiter.throw(*exc)
break
def send_exception(self, *args): def send_exception(self, *args):
# the arguments and the same as for greenlet.throw # the arguments and the same as for greenlet.throw
return self.send(None, args) return self.send(None, args)
class Job(object):
"""Spawn a greenlet, control its execution and collect the result.
use spawn_new() classmethod to spawn a new coroutine and get a new Job instance;
use kill() method to kill the greenlet running the function;
use wait() method to collect the result of the function.
"""
def __init__(self, ev=None):
if ev is None:
ev = event()
self.event = event()
@classmethod
def spawn_new(cls, function, *args, **kwargs):
job = cls()
job.spawn(function, *args, **kwargs)
return job
def spawn(self, function, *args, **kwargs):
assert not hasattr(self, 'greenlet_ref'), 'spawn can only be used once per instance'
g = api.spawn(_collect_result, weakref.ref(self), function, *args, **kwargs)
self.greenlet_ref = weakref.ref(g)
# spawn_later can be also implemented here
@property
def greenlet(self):
return self.greenlet_ref()
def __nonzero__(self):
greenlet = self.greenlet_ref()
if greenlet is not None and not greenlet.dead:
return True
return False
def __repr__(self):
klass = type(self).__name__
if self.greenlet is not None and self.greenlet.dead:
dead = '(dead)'
else:
dead = ''
return '<%s greenlet=%r%s event=%s>' % (klass, self.greenlet, dead, self.event)
def wait(self):
"""Wait for the spawned greenlet to exit.
Return the result of the function if it completed without errors;
re-raise the exception otherwise.
Return GreenletExit() object if the greenlet was killed.
"""
return self.event.wait()
def poll(self, notready=None):
return self.event.poll(notready)
def poll_result(self, notready=None):
return self.event.poll_result(notready)
def poll_exception(self, notready=None):
return self.event.poll_exception(notready)
def ready(self):
return self.event.ready()
def has_result(self):
return self.event.has_result()
def has_exception(self):
return self.event.has_exception()
def _send(self, result):
self.event.send(result)
def _send_exception(self, *throw_args):
self.event.send_exception(*throw_args)
def kill(self, *throw_args):
greenlet = self.greenlet_ref()
if greenlet is not None:
return api.kill(greenlet, *throw_args)
def kill_after(self, seconds):
return api.call_after_global(seconds, _kill_by_ref, weakref.ref(self))
def _kill_by_ref(async_job_ref):
async_job = async_job_ref()
if async_job is not None:
async_job.kill()
def _collect_result(job_ref, function, *args, **kwargs):
"""Execute *function* and send its result to job_ref().
If function raises GreenletExit() it's trapped and sent as a regular value.
If job_ref points to a dead object or if DEBUG is true the exception
will be re-raised.
"""
try:
result = function(*args, **kwargs)
except api.GreenletExit, ex:
job = job_ref()
if job is not None:
job._send(ex)
except:
job = job_ref()
if job is not None:
job._send_exception(*sys.exc_info())
if not DEBUG:
return
raise # let hub log the exception
else:
job = job_ref()
if job is not None:
job._send(result)
class GroupMemberJob(Job):
def __init__(self, group_queue, event=None):
self._group_queue = group_queue
Job.__init__(self, event)
def _send(self, result):
self._group_queue.send((self, result, None))
def _send_exception(self, *throw_args):
self._group_queue.send((self, None, throw_args))
class JobGroupExit(api.GreenletExit):
pass
class JobGroup(object):
"""Spawn jobs in the context of the group: when one job raises an exception,
all other jobs are killed immediatelly.
To spawn a job use spawn_job method which returns a Job instance.
>>> group = JobGroup()
>>> job = group.spawn_new(api.get_hub().switch) # give up control to hub forever
>>> _ = group.spawn_new(int, 'bad') # raise ValueError
>>> job.wait()
JobGroupExit('Killed because of ValueError in the group',)
"""
def __init__(self):
self._queue = queue()
self._jobs = []
self._waiter_job = Job.spawn_new(self._waiter)
self._killerror = None
def spawn_new(self, function, *args, **kwargs):
assert self._waiter_job.poll('run') == 'run'
job = GroupMemberJob(self._queue)
self._jobs.append(job)
if self._killerror is None:
job.spawn(function, *args, **kwargs)
else:
job.event.send(self._killerror)
return job
def kill_all(self, *throw_args):
assert self._waiter_job.poll('run') == 'run', '_waiter_job must live'
for job in self._jobs:
g = job.greenlet
if g is not None:
api.get_hub().schedule_call(0, g.throw, *throw_args)
api.sleep(0)
# QQQ: add kill_all_later(seconds, throw_args)
# add kill_delay attribute
def complete(self, *jobs):
assert self._waiter_job.poll('run') == 'run'
left = set(jobs)
for job in jobs:
if job.ready():
left.remove(job)
for job in left:
job.wait()
# XXX make jobs a list, because wait methods will have timeout parameter soon
def wait(self, *jobs):
self.complete(*jobs)
return [x.wait() for x in jobs]
def complete_all(self):
while True:
count = len(self._jobs)
self.complete(*self._jobs)
# it's possible that more jobs were added while we were waiting
if count == len(self._jobs):
break
def wait_all(self):
self.complete_all()
return [x.wait() for x in self._jobs]
def _waiter(self):
# XXX: this lives forever, fix it to exit after all jobs died
# XXX: add __nonzero__ method that returns whether JobGroup is alive
# XXX: 3 states: True (alive), finishing, False (all dead)
while True:
job, result, throw_args = self._queue.wait()
if throw_args is None:
if not job.event.ready():
job.event.send(result)
else:
if not job.event.ready():
job.event.send_exception(*throw_args)
if self._killerror is None:
type = throw_args[0]
self._killerror = JobGroupExit('Killed because of %s in the group' % type.__name__)
self.kill_all(self._killerror)
# cannot exit here, as I need to deliver GreenExits
class multievent(object): class multievent(object):
"""is an event that can hold more than one value (it cannot be cancelled though) """is an event that can hold more than one value (it cannot be cancelled though)
is like a queue, but if there're waiters blocked, send/send_exception will wake up is like a queue, but if there're waiters blocked, send/send_exception will wake up
all of them, just like an event will do (queue will wake up only one) all of them, just like an event will do (queue will wake up only one)
""" """
# XXX to be removed
def __init__(self): def __init__(self):
self.items = collections.deque() self.items = collections.deque()
@@ -590,7 +381,6 @@ class BoundedSemaphore(object):
the calling coroutine until count becomes nonzero again. Attempting to the calling coroutine until count becomes nonzero again. Attempting to
release() after count has reached limit suspends the calling coroutine until release() after count has reached limit suspends the calling coroutine until
count becomes less than limit again. count becomes less than limit again.
""" """
def __init__(self, count, limit): def __init__(self, count, limit):
if count > limit: if count > limit:
@@ -632,9 +422,9 @@ class BoundedSemaphore(object):
def _do_unlock(self): def _do_unlock(self):
if self._release_waiters and self._acquire_waiters: if self._release_waiters and self._acquire_waiters:
api.get_hub().schedule_call_global(0, self._do_acquire)
waiter, _unused = self._release_waiters.popitem() waiter, _unused = self._release_waiters.popitem()
waiter.switch() waiter.switch()
self._do_acquire()
def _do_release(self): def _do_release(self):
if self._release_waiters and self.counter<self.limit: if self._release_waiters and self.counter<self.limit:
@@ -824,7 +614,7 @@ class CoroutinePool(pools.Pool):
sender = event() sender = event()
(evt, func, args, kw) = recvd (evt, func, args, kw) = recvd
self._safe_apply(evt, func, args, kw) self._safe_apply(evt, func, args, kw)
api.get_hub().cancel_timers(api.getcurrent()) #api.get_hub().cancel_timers(api.getcurrent())
# Likewise, delete these variables or else they will # Likewise, delete these variables or else they will
# be referenced by this frame until replaced by the # be referenced by this frame until replaced by the
# next recvd, which may or may not be a long time from # next recvd, which may or may not be a long time from

606
eventlet/httpd.py Normal file
View File

@@ -0,0 +1,606 @@
"""\
@file httpd.py
@author Donovan Preston
Copyright (c) 2005-2006, Donovan Preston
Copyright (c) 2007, Linden Research, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import cgi
import errno
from eventlet.green import socket
import sys
import time
from eventlet.green import urllib
import traceback
from eventlet.green import BaseHTTPServer
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from eventlet import api
from eventlet import coros
DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1'
USE_ACCESS_LOG = True
CONNECTION_CLOSED = (errno.EPIPE, errno.ECONNRESET)
class ErrorResponse(Exception):
_responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
def __init__(self, code, reason_phrase=None, headers=None, body=None):
Exception.__init__(self, reason_phrase)
self.code = code
if reason_phrase is None:
self.reason = self._responses[code][0]
else:
self.reason = reason_phrase
self.headers = headers
if body is None:
self.body = self._responses[code][1]
else:
self.body = body
class Request(object):
_method = None
_path = None
_responsecode = 200
_reason_phrase = None
_request_started = False
_chunked = False
_producer_adapters = {}
depth = 0
def __init__(self, protocol, method, path, headers):
self.context = {}
self.request_start_time = time.time()
self.site = protocol.server.site
self.protocol = protocol
self._method = method
if '?' in path:
self._path, self._query = path.split('?', 1)
self._query = self._query.replace('&amp;', '&')
else:
self._path, self._query = path, None
self._incoming_headers = headers
self._outgoing_headers = dict()
def response(self, code, reason_phrase=None, headers=None, body=None):
"""Change the response code. This will not be sent until some
data is written; last call to this method wins. Default is
200 if this is not called.
"""
self._responsecode = code
self._reason_phrase = reason_phrase
self.protocol.set_response_code(self, code, reason_phrase)
if headers is not None:
try:
headers = headers.iteritems()
except AttributeError:
pass
for key, value in headers:
self.set_header(key, value)
if body is not None:
self.write(body)
def is_okay(self):
return 200 <= self._responsecode <= 299
def full_url(self):
path = self.path()
query = self.query()
if query:
path = path + '?' + query
via = self.get_header('via', '')
if via.strip():
next_part = iter(via.split()).next
received_protocol = next_part()
received_by = next_part()
if received_by.endswith(','):
received_by = received_by[:-1]
else:
comment = ''
while not comment.endswith(','):
try:
comment += next_part()
except StopIteration:
comment += ','
break
comment = comment[:-1]
else:
received_by = self.get_header('host')
return '%s://%s%s' % (self.request_protocol(), received_by, path)
def begin_response(self, length="-"):
"""Begin the response, and return the initial response text
"""
self._request_started = True
request_time = time.time() - self.request_start_time
code = self._responsecode
proto = self.protocol
if USE_ACCESS_LOG:
proto.server.write_access_log_line(
proto.client_address[0],
time.strftime("%d/%b/%Y %H:%M:%S"),
proto.requestline,
code,
length,
request_time)
if self._reason_phrase is not None:
message = self._reason_phrase.split("\n")[0]
elif code in proto.responses:
message = proto.responses[code][0]
else:
message = ''
if proto.request_version == 'HTTP/0.9':
return []
response_lines = proto.generate_status_line()
if not self._outgoing_headers.has_key('connection'):
con = self.get_header('connection')
if con is None and proto.request_version == 'HTTP/1.0':
con = 'close'
if con is not None:
self.set_header('connection', con)
for key, value in self._outgoing_headers.items():
key = '-'.join([x.capitalize() for x in key.split('-')])
response_lines.append("%s: %s" % (key, value))
response_lines.append("")
return response_lines
def write(self, obj):
"""Writes an arbitrary object to the response, using
the sitemap's adapt method to convert it to bytes.
"""
if isinstance(obj, str):
self._write_bytes(obj)
elif isinstance(obj, unicode):
# use utf8 encoding for now, *TODO support charset negotiation
# Content-Type: text/html; charset=utf-8
ctype = self._outgoing_headers.get('content-type', 'text/html')
ctype = ctype + '; charset=utf-8'
self._outgoing_headers['content-type'] = ctype
self._write_bytes(obj.encode('utf8'))
else:
self.site.adapt(obj, self)
def _write_bytes(self, data):
"""Write all the data of the response.
Can be called just once.
"""
if self._request_started:
print "Request has already written a response:"
traceback.print_stack()
return
self._outgoing_headers['content-length'] = len(data)
response_lines = self.begin_response(len(data))
response_lines.append(data)
self.protocol.wfile.write("\r\n".join(response_lines))
if hasattr(self.protocol.wfile, 'flush'):
self.protocol.wfile.flush()
def method(self):
return self._method
def path(self):
return self._path
def path_segments(self):
return [urllib.unquote_plus(x) for x in self._path.split('/')[1:]]
def query(self):
return self._query
def uri(self):
if self._query:
return '%s?%s' % (
self._path, self._query)
return self._path
def get_headers(self):
return self._incoming_headers
def get_header(self, header_name, default=None):
return self.get_headers().get(header_name.lower(), default)
def get_query_pairs(self):
if not hasattr(self, '_split_query'):
if self._query is None:
self._split_query = ()
else:
spl = self._query.split('&')
spl = [x.split('=', 1) for x in spl if x]
self._split_query = []
for query in spl:
if len(query) == 1:
key = query[0]
value = ''
else:
key, value = query
self._split_query.append((urllib.unquote_plus(key), urllib.unquote_plus(value)))
return self._split_query
def get_queries_generator(self, name):
"""Generate all query parameters matching the given name.
"""
for key, value in self.get_query_pairs():
if key == name or not name:
yield value
get_queries = lambda self, name: list(self.get_queries_generator)
def get_query(self, name, default=None):
try:
return self.get_queries_generator(name).next()
except StopIteration:
return default
def get_arg_list(self, name):
return self.get_field_storage().getlist(name)
def get_arg(self, name, default=None):
return self.get_field_storage().getfirst(name, default)
def get_field_storage(self):
if not hasattr(self, '_field_storage'):
if self.method() == 'GET':
data = ''
if self._query:
data = self._query
else:
data = self.read_body()
fl = StringIO(data)
## Allow our resource to provide the FieldStorage instance for
## customization purposes.
headers = self.get_headers()
environ = dict(
REQUEST_METHOD='POST',
QUERY_STRING=self._query or '')
self._field_storage = cgi.FieldStorage(fl, headers, environ=environ)
return self._field_storage
def set_header(self, key, value):
if key.lower() == 'connection' and value.lower() == 'close':
self.protocol.close_connection = 1
self._outgoing_headers[key.lower()] = value
__setitem__ = set_header
def get_outgoing_header(self, key):
return self._outgoing_headers[key.lower()]
def has_outgoing_header(self, key):
return self._outgoing_headers.has_key(key.lower())
def socket(self):
return self.protocol.socket
def error(self, response=None, body=None, log_traceback=True):
if log_traceback:
traceback.print_exc(file=self.log)
if response is None:
response = 500
if body is None:
typ, val, tb = sys.exc_info()
body = dict(type=str(typ), error=True, reason=str(val))
self.response(response)
if(type(body) is str and not self.response_written()):
self.write(body)
return
try:
produce(body, self)
except Exception, e:
traceback.print_exc(file=self.log)
if not self.response_written():
self.write('Internal Server Error')
def not_found(self):
self.error(404, 'Not Found\n', log_traceback=False)
def raw_body(self):
if not hasattr(self, '_cached_body'):
self.read_body()
return self._cached_body
def read_body(self):
""" Returns the string body that was read off the request, or
the empty string if there was no request body.
Requires a content-length header. Caches the body so multiple
calls to read_body() are free.
"""
if not hasattr(self, '_cached_body'):
length = self.get_header('content-length')
if length:
length = int(length)
if length:
self._cached_body = self.protocol.rfile.read(length)
else:
self._cached_body = ''
return self._cached_body
def parsed_body(self):
""" Returns the parsed version of the body, using the
content-type header to select from the parsers on the site
object.
If no parser is found, returns the string body from
read_body(). Caches the parsed body so multiple calls to
parsed_body() are free.
"""
if not hasattr(self, '_cached_parsed_body'):
body = self.read_body()
if hasattr(self.site, 'parsers'):
ct = self.get_header('content-type')
parser = self.site.parsers.get(ct)
if parser is not None:
body = parser(body)
else:
ex = ValueError("Could not find parser for content-type: %s" % ct)
ex.body = body
raise ex
self._cached_parsed_body = body
return self._cached_parsed_body
def override_body(self, body):
if not hasattr(self, '_cached_parsed_body'):
self.read_body() ## Read and discard body
self._cached_parsed_body = body
def response_written(self):
## TODO change badly named variable
return self._request_started
def request_version(self):
return self.protocol.request_version
def request_protocol(self):
if self.protocol.is_secure:
return "https"
return "http"
def server_address(self):
return self.protocol.server.address
def __repr__(self):
return "<Request %s %s>" % (
getattr(self, '_method'), getattr(self, '_path'))
DEFAULT_TIMEOUT = 300
# This value was chosen because apache 2 has a default limit of 8190.
# I believe that slightly smaller number is because apache does not
# count the \r\n.
MAX_REQUEST_LINE = 8192
class Timeout(RuntimeError):
pass
class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.rfile = self.wfile = request.makeGreenFile()
self.is_secure = request.is_secure
request.close() # close this now so that when rfile and wfile are closed, the socket gets closed
self.client_address = client_address
self.server = server
self.set_response_code(None, 200, None)
self.protocol_version = server.max_http_version
def close(self):
self.rfile.close()
self.wfile.close()
def set_response_code(self, request, code, message):
self._code = code
if message is not None:
self._message = message.split("\n")[0]
elif code in self.responses:
self._message = self.responses[code][0]
else:
self._message = ''
def generate_status_line(self):
return [
"%s %d %s" % (
self.protocol_version, self._code, self._message)]
def write_bad_request(self, status, reason):
self.set_response_code(self, status, reason)
self.wfile.write(''.join(self.generate_status_line()))
self.wfile.write('\r\nServer: %s\r\n' % self.version_string())
self.wfile.write('Date: %s\r\n' % self.date_time_string())
self.wfile.write('Content-Length: 0\r\n\r\n')
def handle(self):
self.close_connection = 0
timeout = DEFAULT_TIMEOUT
while not self.close_connection:
if timeout == 0:
break
cancel = api.exc_after(timeout, Timeout)
try:
self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE)
if self.raw_requestline is not None:
if len(self.raw_requestline) == MAX_REQUEST_LINE:
# Someone sent a request line which is too
# large. Be helpful and tell them.
self.write_bad_request(414, 'Request-URI Too Long')
self.close_connection = True
continue
except socket.error, e:
if e[0] in CONNECTION_CLOSED:
self.close_connection = True
cancel.cancel()
continue
except Timeout:
self.close_connection = True
continue
except Exception, e:
try:
if e[0][0][0].startswith('SSL'):
print "SSL Error:", e[0][0]
self.close_connection = True
cancel.cancel()
continue
except Exception, f:
print "Exception in ssl test:",f
pass
raise e
cancel.cancel()
if not self.raw_requestline or not self.parse_request():
self.close_connection = True
continue
self.set_response_code(None, 200, None)
request = Request(self, self.command, self.path, self.headers)
request.set_header('Server', self.version_string())
request.set_header('Date', self.date_time_string())
try:
timeout = int(request.get_header('keep-alive', timeout))
except TypeError, ValueError:
pass
try:
try:
try:
self.server.site.handle_request(request)
except ErrorResponse, err:
request.response(code=err.code,
reason_phrase=err.reason,
headers=err.headers,
body=err.body)
finally:
# clean up any timers that might have been left around by the handling code
pass
#api.get_hub().cancel_timers(api.getcurrent())
# throw an exception if it failed to write a body
if not request.response_written():
raise NotImplementedError("Handler failed to write response to request: %s" % request)
if not hasattr(self, '_cached_body'):
try:
request.read_body() ## read & discard body
except:
pass
except socket.error, e:
# Broken pipe, connection reset by peer
if e[0] in CONNECTION_CLOSED:
#print "Remote host closed connection before response could be sent"
pass
else:
raise
except Exception, e:
self.server.log_message("Exception caught in HttpRequest.handle():\n")
self.server.log_exception(*sys.exc_info())
if not request.response_written():
request.response(500)
request.write('Internal Server Error')
self.close()
raise e # can't do a plain raise since exc_info might have been cleared
self.close()
class Server(BaseHTTPServer.HTTPServer):
def __init__(self, socket, address, site, log, max_http_version=DEFAULT_MAX_HTTP_VERSION):
self.socket = socket
self.address = address
self.site = site
self.max_http_version = max_http_version
if log:
self.log = log
if hasattr(log, 'info'):
log.write = log.info
else:
self.log = self
def write(self, something):
sys.stdout.write('%s' % (something, )); sys.stdout.flush()
def log_message(self, message):
self.log.write(message)
def log_exception(self, type, value, tb):
self.log.write(''.join(traceback.format_exception(type, value, tb)))
def write_access_log_line(self, *args):
"""Write a line to the access.log. Arguments:
client_address, date_time, requestline, code, size, request_time
"""
self.log.write(
'%s - - [%s] "%s" %s %s %.6f\n' % args)
def server(sock, site, log=None, max_size=512, serv=None, max_http_version=DEFAULT_MAX_HTTP_VERSION):
pool = coros.CoroutinePool(max_size=max_size)
if serv is None:
serv = Server(sock, sock.getsockname(), site, log, max_http_version=max_http_version)
try:
serv.log.write("httpd starting up on %s\n" % (sock.getsockname(), ))
while True:
try:
new_sock, address = sock.accept()
proto = HttpProtocol(new_sock, address, serv)
pool.execute_async(proto.handle)
api.sleep(0) # sleep to allow other coros to run
except KeyboardInterrupt:
api.get_hub().remove_descriptor(sock.fileno())
serv.log.write("httpd exiting\n")
break
finally:
try:
sock.close()
except socket.error:
pass
if __name__ == '__main__':
class TestSite(object):
def handle_request(self, req):
req.write('hello')
server(
api.tcp_listener(('127.0.0.1', 8080)),
TestSite())

View File

@@ -22,24 +22,21 @@ THE SOFTWARE.
""" """
import bisect import bisect
import weakref
import sys import sys
import socket
import errno
import traceback import traceback
import time import time
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet.timer import Timer from eventlet.timer import Timer, LocalTimer
_g_debug = True _g_debug = True
class BaseHub(object): class BaseHub(object):
""" Base hub class for easing the implementation of subclasses that are """ Base hub class for easing the implementation of subclasses that are
specific to a particular underlying event architecture. """ specific to a particular underlying event architecture. """
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
def __init__(self, clock=time.time): def __init__(self, clock=time.time):
self.readers = {} self.readers = {}
self.writers = {} self.writers = {}
@@ -51,7 +48,6 @@ class BaseHub(object):
self.stopping = False self.stopping = False
self.running = False self.running = False
self.timers = [] self.timers = []
self.timers_by_greenlet = {}
self.next_timers = [] self.next_timers = []
self.observers = {} self.observers = {}
self.observer_modes = { self.observer_modes = {
@@ -64,12 +60,12 @@ class BaseHub(object):
def add_descriptor(self, fileno, read=None, write=None, exc=None): def add_descriptor(self, fileno, read=None, write=None, exc=None):
""" Signals an intent to read/write from a particular file descriptor. """ Signals an intent to read/write from a particular file descriptor.
The fileno argument is the file number of the file of interest. The other The fileno argument is the file number of the file of interest. The other
arguments are either callbacks or None. If there is a callback for read arguments are either callbacks or None. If there is a callback for read
or write, the hub sets things up so that when the file descriptor is or write, the hub sets things up so that when the file descriptor is
ready to be read or written, the callback is called. ready to be read or written, the callback is called.
The exc callback is called when the socket represented by the file The exc callback is called when the socket represented by the file
descriptor is closed. The intent is that the the exc callbacks should descriptor is closed. The intent is that the the exc callbacks should
only be present when either a read or write callback is also present, only be present when either a read or write callback is also present,
@@ -93,7 +89,7 @@ class BaseHub(object):
self.excs.pop(fileno, None) self.excs.pop(fileno, None)
self.waiters_by_greenlet[greenlet.getcurrent()] = fileno self.waiters_by_greenlet[greenlet.getcurrent()] = fileno
return fileno return fileno
def remove_descriptor(self, fileno): def remove_descriptor(self, fileno):
self.readers.pop(fileno, None) self.readers.pop(fileno, None)
self.writers.pop(fileno, None) self.writers.pop(fileno, None)
@@ -142,7 +138,7 @@ class BaseHub(object):
self.remove_descriptor(fileno) self.remove_descriptor(fileno)
except Exception, e: except Exception, e:
print >>sys.stderr, "Exception while removing descriptor! %r" % (e,) print >>sys.stderr, "Exception while removing descriptor! %r" % (e,)
def wait(self, seconds=None): def wait(self, seconds=None):
raise NotImplementedError("Implement this in a subclass") raise NotImplementedError("Implement this in a subclass")
@@ -154,7 +150,7 @@ class BaseHub(object):
if not t: if not t:
return None return None
return t[0][0] return t[0][0]
def run(self): def run(self):
"""Run the runloop until abort is called. """Run the runloop until abort is called.
""" """
@@ -220,12 +216,12 @@ class BaseHub(object):
""" """
for mode in self.observers.pop(observer, ()): for mode in self.observers.pop(observer, ()):
self.observer_modes[mode].remove(observer) self.observer_modes[mode].remove(observer)
def squelch_observer_exception(self, observer, exc_info): def squelch_observer_exception(self, observer, exc_info):
traceback.print_exception(*exc_info) traceback.print_exception(*exc_info)
print >>sys.stderr, "Removing observer: %r" % (observer,) print >>sys.stderr, "Removing observer: %r" % (observer,)
self.remove_observer(observer) self.remove_observer(observer)
def fire_observers(self, activity): def fire_observers(self, activity):
for observer in self.observer_modes[activity]: for observer in self.observer_modes[activity]:
try: try:
@@ -243,27 +239,13 @@ class BaseHub(object):
# the 0 placeholder makes it easy to bisect_right using (now, 1) # the 0 placeholder makes it easy to bisect_right using (now, 1)
self.next_timers.append((when, 0, info)) self.next_timers.append((when, 0, info))
def add_timer(self, timer, track=True): def add_timer(self, timer):
scheduled_time = self.clock() + timer.seconds scheduled_time = self.clock() + timer.seconds
self._add_absolute_timer(scheduled_time, timer) self._add_absolute_timer(scheduled_time, timer)
if track:
self.track_timer(timer)
return scheduled_time return scheduled_time
def track_timer(self, timer):
current_greenlet = greenlet.getcurrent()
timer.greenlet = current_greenlet
self.timers_by_greenlet.setdefault(
current_greenlet,
weakref.WeakKeyDictionary())[timer] = True
def timer_finished(self, timer): def timer_finished(self, timer):
try: pass
del self.timers_by_greenlet[timer.greenlet][timer]
if not self.timers_by_greenlet[timer.greenlet]:
del self.timers_by_greenlet[timer.greenlet]
except (KeyError, AttributeError):
pass
def timer_canceled(self, timer): def timer_canceled(self, timer):
self.timer_finished(timer) self.timer_finished(timer)
@@ -283,8 +265,8 @@ class BaseHub(object):
*args: Arguments to pass to the callable when called. *args: Arguments to pass to the callable when called.
**kw: Keyword arguments to pass to the callable when called. **kw: Keyword arguments to pass to the callable when called.
""" """
t = Timer(seconds, cb, *args, **kw) t = LocalTimer(seconds, cb, *args, **kw)
self.add_timer(t, track=True) self.add_timer(t)
return t return t
schedule_call = schedule_call_local schedule_call = schedule_call_local
@@ -299,7 +281,7 @@ class BaseHub(object):
**kw: Keyword arguments to pass to the callable when called. **kw: Keyword arguments to pass to the callable when called.
""" """
t = Timer(seconds, cb, *args, **kw) t = Timer(seconds, cb, *args, **kw)
self.add_timer(t, track=False) self.add_timer(t)
return t return t
def fire_timers(self, when): def fire_timers(self, when):
@@ -319,26 +301,6 @@ class BaseHub(object):
self.timer_finished(timer) self.timer_finished(timer)
del t[:last] del t[:last]
def cancel_timers(self, greenlet, quiet=False):
if greenlet not in self.timers_by_greenlet:
return
for timer in self.timers_by_greenlet[greenlet].keys():
if not timer.cancelled and not timer.called and timer.seconds:
## If timer.seconds is 0, this isn't a timer, it's
## actually eventlet's silly way of specifying whether
## a coroutine is "ready to run" or not.
try:
# this might be None due to weirdness with weakrefs
timer.cancel()
except TypeError:
pass
if _g_debug and not quiet:
print 'Hub cancelling left-over timer %s' % timer
try:
del self.timers_by_greenlet[greenlet]
except KeyError:
pass
# for debugging: # for debugging:
def get_readers(self): def get_readers(self):

View File

@@ -29,7 +29,6 @@ import errno
import traceback import traceback
import time import time
from eventlet.timer import Timer
from eventlet.hubs import hub from eventlet.hubs import hub
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
@@ -112,13 +111,11 @@ class Hub(hub.BaseHub):
self.interrupted = False self.interrupted = False
raise KeyboardInterrupt() raise KeyboardInterrupt()
def add_timer(self, timer, track=True): def add_timer(self, timer):
# store the pyevent timer object so that we can cancel later # store the pyevent timer object so that we can cancel later
eventtimer = libev.Timer(timer.seconds, 0, self._evloop, timer) eventtimer = libev.Timer(timer.seconds, 0, self._evloop, timer)
timer.impltimer = eventtimer timer.impltimer = eventtimer
eventtimer.start() eventtimer.start()
if track:
self.track_timer(timer)
def timer_finished(self, timer): def timer_finished(self, timer):
try: try:

View File

@@ -117,13 +117,11 @@ class Hub(hub.BaseHub):
self.interrupted = False self.interrupted = False
raise KeyboardInterrupt() raise KeyboardInterrupt()
def add_timer(self, timer, track=True): def add_timer(self, timer):
# store the pyevent timer object so that we can cancel later # store the pyevent timer object so that we can cancel later
eventtimer = event.timeout(timer.seconds, timer) eventtimer = event.timeout(timer.seconds, timer)
timer.impltimer = eventtimer timer.impltimer = eventtimer
eventtimer.add() eventtimer.add()
if track:
self.track_timer(timer)
def timer_finished(self, timer): def timer_finished(self, timer):
try: try:

View File

@@ -1,12 +1,29 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys import sys
import threading import threading
import weakref
from twisted.internet.base import DelayedCall as TwistedDelayedCall from twisted.internet.base import DelayedCall as TwistedDelayedCall
from eventlet.hubs.hub import _g_debug
from eventlet.support.greenlet import greenlet from eventlet.support.greenlet import greenlet
import traceback
class DelayedCall(TwistedDelayedCall): class DelayedCall(TwistedDelayedCall):
"fix DelayedCall to behave like eventlet's Timer in some respects" "fix DelayedCall to behave like eventlet's Timer in some respects"
@@ -17,15 +34,31 @@ class DelayedCall(TwistedDelayedCall):
return return
return TwistedDelayedCall.cancel(self) return TwistedDelayedCall.cancel(self)
def callLater(reactor, _seconds, _f, *args, **kw): class LocalDelayedCall(DelayedCall):
# the same as original but creates fixed DelayedCall instance
def __init__(self, *args, **kwargs):
self.greenlet = greenlet.getcurrent()
DelayedCall.__init__(self, *args, **kwargs)
def _get_cancelled(self):
if self.greenlet is None or self.greenlet.dead:
return True
return self.__dict__['cancelled']
def _set_cancelled(self, value):
self.__dict__['cancelled'] = value
cancelled = property(_get_cancelled, _set_cancelled)
def callLater(DelayedCallClass, reactor, _seconds, _f, *args, **kw):
# the same as original but creates fixed DelayedCall instance
assert callable(_f), "%s is not callable" % _f assert callable(_f), "%s is not callable" % _f
assert sys.maxint >= _seconds >= 0, \ assert sys.maxint >= _seconds >= 0, \
"%s is not greater than or equal to 0 seconds" % (_seconds,) "%s is not greater than or equal to 0 seconds" % (_seconds,)
tple = DelayedCall(reactor.seconds() + _seconds, _f, args, kw, tple = DelayedCallClass(reactor.seconds() + _seconds, _f, args, kw,
reactor._cancelCallLater, reactor._cancelCallLater,
reactor._moveCallLaterSooner, reactor._moveCallLaterSooner,
seconds=reactor.seconds) seconds=reactor.seconds)
reactor._newTimedCalls.append(tple) reactor._newTimedCalls.append(tple)
return tple return tple
@@ -62,7 +95,7 @@ class socket_rwdescriptor:
class BaseTwistedHub(object): class BaseTwistedHub(object):
"""This hub does not run a dedicated greenlet for the mainloop (unlike TwistedHub). """This hub does not run a dedicated greenlet for the mainloop (unlike TwistedHub).
Instead, it assumes that the mainloop is run in the main greenlet. Instead, it assumes that the mainloop is run in the main greenlet.
This makes running "green" functions in the main greenlet impossible but is useful This makes running "green" functions in the main greenlet impossible but is useful
when you want to call reactor.run() yourself. when you want to call reactor.run() yourself.
""" """
@@ -74,7 +107,6 @@ class BaseTwistedHub(object):
def __init__(self, mainloop_greenlet): def __init__(self, mainloop_greenlet):
self.greenlet = mainloop_greenlet self.greenlet = mainloop_greenlet
self.waiters_by_greenlet = {} self.waiters_by_greenlet = {}
self.timers_by_greenlet = {}
def switch(self): def switch(self):
assert greenlet.getcurrent() is not self.greenlet, 'Impossible to switch() from the mainloop greenlet' assert greenlet.getcurrent() is not self.greenlet, 'Impossible to switch() from the mainloop greenlet'
@@ -117,75 +149,18 @@ class BaseTwistedHub(object):
def schedule_call_local(self, seconds, func, *args, **kwargs): def schedule_call_local(self, seconds, func, *args, **kwargs):
from twisted.internet import reactor from twisted.internet import reactor
def call_with_timer_attached(*args1, **kwargs1): def call_if_greenlet_alive(*args1, **kwargs1):
try: if timer.greenlet.dead:
return func(*args1, **kwargs1) return
finally: return func(*args1, **kwargs1)
if seconds: timer = callLater(LocalDelayedCall, reactor, seconds, call_if_greenlet_alive, *args, **kwargs)
self.timer_finished(timer)
timer = callLater(reactor, seconds, call_with_timer_attached, *args, **kwargs)
if seconds:
self.track_timer(timer)
return timer return timer
schedule_call = schedule_call_local schedule_call = schedule_call_local
def schedule_call_global(self, seconds, func, *args, **kwargs): def schedule_call_global(self, seconds, func, *args, **kwargs):
from twisted.internet import reactor from twisted.internet import reactor
return callLater(reactor, seconds, func, *args, **kwargs) return callLater(DelayedCall, reactor, seconds, func, *args, **kwargs)
def track_timer(self, timer):
try:
current_greenlet = greenlet.getcurrent()
timer.greenlet = current_greenlet
self.timers_by_greenlet.setdefault(
current_greenlet,
weakref.WeakKeyDictionary())[timer] = True
except:
print 'track_timer failed'
traceback.print_exc()
raise
def timer_finished(self, timer):
try:
greenlet = timer.greenlet
del self.timers_by_greenlet[greenlet][timer]
if not self.timers_by_greenlet[greenlet]:
del self.timers_by_greenlet[greenlet]
except (AttributeError, KeyError):
pass
except:
print 'timer_finished failed'
traceback.print_exc()
raise
def cancel_timers(self, greenlet, quiet=False):
try:
if greenlet not in self.timers_by_greenlet:
return
for timer in self.timers_by_greenlet[greenlet].keys():
if not timer.cancelled and not timer.called and hasattr(timer, 'greenlet'):
## If timer.seconds is 0, this isn't a timer, it's
## actually eventlet's silly way of specifying whether
## a coroutine is "ready to run" or not.
## TwistedHub: I do the same, by not attaching 'greenlet' attribute to zero-timers QQQ
try:
# this might be None due to weirdness with weakrefs
timer.cancel()
except TypeError:
pass
if _g_debug and not quiet:
print 'Hub cancelling left-over timer %s' % timer
try:
del self.timers_by_greenlet[greenlet]
except KeyError:
pass
except:
print 'cancel_timers failed'
import traceback
traceback.print_exc()
if not quiet:
raise
def abort(self): def abort(self):
from twisted.internet import reactor from twisted.internet import reactor
@@ -253,7 +228,7 @@ class TwistedHub(BaseTwistedHub):
def run(self, installSignalHandlers=None): def run(self, installSignalHandlers=None):
if installSignalHandlers is None: if installSignalHandlers is None:
installSignalHandlers = self.installSignalHandlers installSignalHandlers = self.installSignalHandlers
# main loop, executed in a dedicated greenlet # main loop, executed in a dedicated greenlet
from twisted.internet import reactor from twisted.internet import reactor
assert Hub.state in [1, 3], ('run function is not reentrant', Hub.state) assert Hub.state in [1, 3], ('run function is not reentrant', Hub.state)
@@ -274,7 +249,7 @@ class TwistedHub(BaseTwistedHub):
# the main loop at the next switch. # the main loop at the next switch.
Hub.state = 3 Hub.state = 3
raise raise
# clean exit here is needed for abort() method to work # clean exit here is needed for abort() method to work
# do not raise an exception here. # do not raise an exception here.

View File

@@ -1,8 +1,29 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Advanced coroutine control. """Advanced coroutine control.
This module provides means to spawn, kill and link coroutines. Linking is an This module provides means to spawn, kill and link coroutines. Linking means
act of subscribing to the coroutine's result, either in form of return value subscribing to the coroutine's result, either in form of return value or
or unhandled exception. unhandled exception.
To create a linkable coroutine use spawn function provided by this module: To create a linkable coroutine use spawn function provided by this module:
@@ -13,7 +34,7 @@ To create a linkable coroutine use spawn function provided by this module:
The return value of spawn is an instance of Proc class that you can "link": The return value of spawn is an instance of Proc class that you can "link":
* p.link(obj) - notify obj when the coroutine is finished * p.link(obj) - notify obj when the coroutine is finished
What does "notify" means here depends on the type of `obj': a callable is What does "notify" means here depends on the type of `obj': a callable is
simply called, an event or a queue is notified using send/send_exception simply called, an event or a queue is notified using send/send_exception
@@ -29,95 +50,123 @@ Here's an example:
Now, even though `p' is finished it's still possible to link it. In this Now, even though `p' is finished it's still possible to link it. In this
case the notification is performed immediatelly: case the notification is performed immediatelly:
>>> p.link() # without an argument provided, links to the current greenlet >>> p.link()
>>> api.sleep(0)
Traceback (most recent call last): Traceback (most recent call last):
... ...
LinkedCompleted: linked proc 'demofunc' completed successfully LinkedCompleted: linked proc '<function demofunc at 0x...>' completed successfully
There are also link_return and link_raise methods that only deliver a return (Without an argument, link is created to the current greenlet)
There are also link_value and link_exception methods that only deliver a return
value and an unhandled exception respectively (plain `link' deliver both). value and an unhandled exception respectively (plain `link' deliver both).
Suppose we want to spawn a "child" greenlet to do an important part of the task, Suppose we want to spawn a greenlet to do an important part of the task; if it
but it it fails then there's no way to complete the task so the "parent" must fails then there's no way to complete the task so the parent must fail as well;
fail as well; `link_raise' is useful here: `link_exception' is useful here:
>>> p = spawn(demofunc, 1, 0) >>> p = spawn(demofunc, 1, 0)
>>> p.link_raise() >>> p.link_exception()
>>> api.sleep(0.01) >>> api.sleep(0.01)
Traceback (most recent call last): Traceback (most recent call last):
... ...
LinkedFailed: linked proc 'demofunc' failed with ZeroDivisionError LinkedFailed: linked proc '<function demofunc at 0x...>' failed with ZeroDivisionError
One application of linking is `wait' function: link to a bunch of coroutines One application of linking is `wait' function: link to a bunch of coroutines
and wait for all them to complete. Such function is provided by this module. and wait for all them to complete. Such function is provided by this module.
""" """
import sys import sys
from weakref import WeakKeyDictionary, ref
from inspect import getargspec
from eventlet import api, coros from eventlet import api, coros
# XXX works with CancellingTimersGreenlet but won't work with greenlet.greenlet (because of weakref)
__all__ = ['LinkedExited', __all__ = ['LinkedExited',
'LinkedFailed', 'LinkedFailed',
'LinkedCompleted', 'LinkedCompleted',
'LinkedKilled', 'LinkedKilled',
'ProcKilled', 'ProcExit',
'wait', 'wait',
'Proc', 'Proc',
'spawn', 'spawn',
'spawn_link', 'spawn_link',
'spawn_link_return', 'spawn_link_value',
'spawn_link_raise'] 'spawn_link_exception']
class LinkedExited(api.GreenletExit): class LinkedExited(api.GreenletExit):
"""linked proc %r exited""" """Raised when a linked proc exits"""
msg = "linked proc %r exited"
def __init__(self, msg=None, name=None): def __init__(self, name=None, msg=None):
self.name = name self.name = name
if not msg: if msg is None:
msg = self.__doc__ % self.name msg = self.msg % self.name
api.GreenletExit.__init__(self, msg) api.GreenletExit.__init__(self, msg)
# def __str__(self):
# msg = api.GreenletExit.__str__(self)
# return msg or (self.__doc__ % self.name)
class LinkedFailed(LinkedExited): class LinkedFailed(LinkedExited):
"""linked proc %r failed""" """Raised when a linked proc dies because of unhandled exception"""
msg = "linked proc %r failed"
def __init__(self, name, typ, _value=None, _tb=None): def __init__(self, name, typ, value=None, tb=None):
#msg = '%s with %s: %s' % (self.__doc__ % self.name, typ.__name__, value) msg = '%s with %s' % ((self.msg % name), typ.__name__)
msg = '%s with %s' % ((self.__doc__ % name), typ.__name__) LinkedExited.__init__(self, name, msg)
LinkedExited.__init__(self, msg, name)
class LinkedCompleted(LinkedExited): class LinkedCompleted(LinkedExited):
"""linked proc %r completed successfully""" """Raised when a linked proc finishes the execution cleanly"""
class LinkedKilled(LinkedCompleted): msg = "linked proc %r completed successfully"
"""linked proc %r was killed"""
# This is a subclass of LinkedCompleted, because GreenletExit is returned,
# not re-raised.
class ProcKilled(api.GreenletExit): class LinkedKilled(LinkedFailed):
"""this proc was killed""" """Raised when a linked proc dies because of unhandled GreenletExit
(i.e. it was killed)
"""
msg = """linked proc %r was killed"""
def wait(linkable_or_list, trap_errors=False): def getLinkedFailed(name, typ, value=None, tb=None):
if hasattr(linkable_or_list, 'link'): if issubclass(typ, api.GreenletExit):
event = coros.event() return LinkedKilled(name, typ, value, tb)
linkable_or_list.link(event) return LinkedFailed(name, typ, value, tb)
try:
return event.wait()
except Exception: class ProcExit(api.GreenletExit):
if trap_errors: """Raised when this proc is killed."""
return
raise SUCCESS, FAILURE = range(2)
class Link(object):
def __init__(self, listener):
self.listener = listener
def _fire(self, source, tag, result):
if tag is SUCCESS:
self._fire_value(source, result)
elif tag is FAILURE:
self._fire_exception(source, result)
else:
raise RuntimeError('invalid arguments to _fire: %r %s %r %r' % (self, source, tag, result))
__call__ = _fire
class LinkToEvent(Link):
def _fire_value(self, source, value):
self.listener.send(value)
def _fire_exception(self, source, throw_args):
self.listener.send_exception(*throw_args)
class LinkToGreenlet(Link):
def _fire_value(self, source, value):
self.listener.throw(LinkedCompleted(source))
def _fire_exception(self, source, throw_args):
self.listener.throw(getLinkedFailed(source, *throw_args))
def waitall(lst, trap_errors=False):
queue = coros.queue() queue = coros.queue()
results = [None] * len(linkable_or_list) results = [None] * len(lst)
for (index, linkable) in enumerate(linkable_or_list): for (index, linkable) in enumerate(lst):
linkable.link(decorate_send(queue, index), weak=False) linkable.link(decorate_send(queue, index))
count = 0 count = 0
while count < len(linkable_or_list): while count < len(lst):
try: try:
index, value = queue.wait() index, value = queue.wait()
except Exception: except Exception:
@@ -129,12 +178,15 @@ def wait(linkable_or_list, trap_errors=False):
return results return results
class decorate_send(object): class decorate_send(object):
#__slots__ = ['_event', '_tag', '__weakref__']
def __init__(self, event, tag): def __init__(self, event, tag):
self._event = event self._event = event
self._tag = tag self._tag = tag
def __repr__(self):
params = (type(self).__name__, self._tag, self._event)
return '<%s tag=%r event=%r>' % params
def __getattr__(self, name): def __getattr__(self, name):
assert name != '_event' assert name != '_event'
return getattr(self._event, name) return getattr(self._event, name)
@@ -143,7 +195,6 @@ class decorate_send(object):
self._event.send((self._tag, value)) self._event.send((self._tag, value))
greenlet_class = api.CancellingTimersGreenlet # greenlet.greenlet
_NOT_USED = object() _NOT_USED = object()
def spawn_greenlet(function, *args): def spawn_greenlet(function, *args):
@@ -151,21 +202,197 @@ def spawn_greenlet(function, *args):
The current greenlet won't be unscheduled. Keyword arguments aren't The current greenlet won't be unscheduled. Keyword arguments aren't
supported (limitation of greenlet), use api.spawn to work around that. supported (limitation of greenlet), use api.spawn to work around that.
""" """
g = greenlet_class(function) g = api.Greenlet(function)
g.parent = api.get_hub().greenlet g.parent = api.get_hub().greenlet
api.get_hub().schedule_call_global(0, g.switch, *args) api.get_hub().schedule_call_global(0, g.switch, *args)
return g return g
class Proc(object): class Source(object):
"""Maintain a set of links to the listeners. Delegate the sent value or
the exception to all of them.
To set up a link, use link_value, link_exception or link method. The
latter establishes both "value" and "exception" link. It is possible to
link to events, queues, greenlets and callables.
>>> source = Source()
>>> event = coros.event()
>>> source.link(event)
Once source's send or send_exception method is called, all the listeners
with the right type of link will be notified ("right type" means that
exceptions won't be delivered to "value" links and values won't be
delivered to "exception" links). Once link has been fired it is removed.
Notifying listeners is performed in the MAINLOOP greenlet. As such it
must not block or call any functions that block. Under the hood notifying
a link means executing a callback, see Link class for details. Notification
must not attempt to switch to the hub, i.e. call any of blocking functions.
>>> source.send('hello')
>>> event.wait()
'hello'
Any error happened while sending will be logged as a regular unhandled
exception. This won't prevent other links from being fired.
There 3 kinds of listeners supported:
1. If `listener' is a greenlet (regardless if it's a raw greenlet or an
extension like Proc), a subclass of LinkedExited exception is raised
in it.
2. If `listener' is something with send/send_exception methods (event,
queue, Source but not Proc) the relevant method is called.
3. If `listener' is a callable, it is called with 3 arguments (see Link class
for details).
"""
def __init__(self, name=None): def __init__(self, name=None):
self.greenlet_ref = None self.name = name
self._receivers = WeakKeyDictionary() self._value_links = {}
self._exception_links = {}
self._result = _NOT_USED self._result = _NOT_USED
self._exc = None self._exc = None
self._kill_exc = None
self.name = name
def ready(self):
return self._result is not _NOT_USED
def link_value(self, listener=None, link=None):
if self.ready() and self._exc is not None:
return
if listener is None:
listener = api.getcurrent()
if link is None:
link = self.getLink(listener)
self._value_links[listener] = link
if self._result is not _NOT_USED:
self.send(self._result)
def link_exception(self, listener=None, link=None):
if self._result is not _NOT_USED and self._exc is None:
return
if listener is None:
listener = api.getcurrent()
if link is None:
link = self.getLink(listener)
self._exception_links[listener] = link
if self._result is not _NOT_USED:
self.send_exception(*self._exc)
def link(self, listener=None, link=None):
if listener is None:
listener = api.getcurrent()
if link is None:
link = self.getLink(listener)
self._value_links[listener] = link
self._exception_links[listener] = link
if self._result is not _NOT_USED:
if self._exc is None:
self.send(self._result)
else:
self.send_exception(*self._exc)
def unlink(self, listener=None):
if listener is None:
listener = api.getcurrent()
self._value_links.pop(listener, None)
self._exception_links.pop(listener, None)
@staticmethod
def getLink(listener):
if hasattr(listener, 'throw'):
return LinkToGreenlet(listener)
if hasattr(listener, 'send'):
return LinkToEvent(listener)
else:
return listener
def send(self, value):
self._result = value
self._exc = None
api.get_hub().schedule_call_global(0, self._do_send, self._value_links.items(),
SUCCESS, value, self._value_links)
def send_exception(self, *throw_args):
self._result = None
self._exc = throw_args
api.get_hub().schedule_call_global(0, self._do_send, self._exception_links.items(),
FAILURE, throw_args, self._exception_links)
def _do_send(self, links, tag, value, consult):
while links:
listener, link = links.pop()
try:
if listener in consult:
try:
link(self.name, tag, value)
finally:
consult.pop(listener, None)
except:
api.get_hub().schedule_call_global(0, self._do_send, links, tag, value, consult)
raise
def wait(self, timeout=None, *throw_args):
"""Wait until send() or send_exception() is called or `timeout' has
expired. Return the argument of send or raise the argument of
send_exception. If timeout has expired, None is returned.
The arguments, when provided, specify how many seconds to wait and what
to do when timeout has expired. They are treated the same way as
api.timeout treats them.
"""
if self._result is not _NOT_USED:
if self._exc is None:
return self._result
else:
api.getcurrent().throw(*self._exc)
if timeout==0:
return
if timeout is not None:
timer = api.timeout(timeout, *throw_args)
timer.__enter__()
EXC = True
try:
try:
event = coros.event()
self.link(event)
try:
return event.wait()
finally:
self.unlink(event)
except:
EXC = False
if timeout is None or not timer.__exit__(*sys.exc_info()):
raise
finally:
if timeout is not None and EXC:
timer.__exit__(None, None, None)
class Proc(Source):
"""A linkable coroutine based on Source.
Upon completion, delivers coroutine's result to the listeners.
"""
def __init__(self, name=None):
self.greenlet = None
Source.__init__(self, name)
def __nonzero__(self):
if self.ready():
# with current _run this does not makes any difference
# still, let keep it there
return False
# otherwise bool(proc) is the same as bool(greenlet)
if self.greenlet is not None:
return bool(self.greenlet)
@property
def dead(self):
return self.ready() or self.greenlet.dead
@classmethod @classmethod
def spawn(cls, function, *args, **kwargs): def spawn(cls, function, *args, **kwargs):
"""Return a new Proc instance that is scheduled to execute """Return a new Proc instance that is scheduled to execute
@@ -177,394 +404,50 @@ class Proc(object):
def run(self, function, *args, **kwargs): def run(self, function, *args, **kwargs):
"""Create a new greenlet to execute `function(*args, **kwargs)'. """Create a new greenlet to execute `function(*args, **kwargs)'.
Newly created greenlet is scheduled upon the next hub iteration, so The created greenlet is scheduled to run upon the next hub iteration.
the current greenlet won't be unscheduled.
""" """
assert self.greenlet_ref is None, "'run' can only be called once per instance" assert self.greenlet is None, "'run' can only be called once per instance"
g = spawn_greenlet(self._run, function, args, kwargs)
self.greenlet_ref = ref(g)
if self.name is None: if self.name is None:
self.name = getattr(function, '__name__', None) self.name = str(function)
if self.name is None: self.greenlet = spawn_greenlet(self._run, function, args, kwargs)
self.name = getattr(type(function), '__name__', '<unknown>')
# return timer from schedule_call_global here?
def _run(self, function, args, kwargs): def _run(self, function, args, kwargs):
"""Execute *function* and send its result to receivers. If function """Internal top level function.
raises GreenletExit it's trapped and treated as a regular value. Execute *function* and send its result to the listeners.
""" """
try: try:
result = function(*args, **kwargs) result = function(*args, **kwargs)
except api.GreenletExit, ex:
self._result = ex
self._kill_exc = LinkedKilled(name=self.name)
self._deliver_result()
except: except:
self._result = None self.send_exception(*sys.exc_info())
self._exc = sys.exc_info()
self._kill_exc = LinkedFailed(self.name, *sys.exc_info())
self._deliver_exception()
raise # let mainloop log the exception raise # let mainloop log the exception
else: else:
self._result = result self.send(result)
self._kill_exc = LinkedCompleted(name=self.name)
self._deliver_result()
# spawn_later/run_later can be also implemented here def throw(self, *throw_args):
"""Used internally to raise the exception.
@property Behaves exactly like greenlet's 'throw' with the exception that ProcExit
def greenlet(self): is raised by default. Do not use this function as it leaves the current
if self.greenlet_ref is not None: greenlet unscheduled forever. Use kill() method instead.
return self.greenlet_ref() """
if not self.dead:
@property if not throw_args:
def ready(self): throw_args = (ProcExit, )
return self._result is not _NOT_USED self.greenlet.throw(*throw_args)
def __nonzero__(self):
if self.ready:
# greenlet's function may already finish yet the greenlet is still alive
# delivering the result to receivers (if some of send methods were blocking)
# we consider such greenlet finished
return False
# otherwise bool(proc) is the same as bool(greenlet)
if self.greenlet is not None:
return bool(self.greenlet)
def _repr_helper(self):
klass = type(self).__name__
if self.greenlet is not None and self.greenlet.dead:
dead = '(dead)'
else:
dead = ''
result = ''
if self._result is not _NOT_USED:
if self._exc is None:
result = ' result=%r' % self._result
else:
result = ' failed'
return '%s greenlet=%r%s rcvrs=%s%s' % (klass, self.greenlet, dead, len(self._receivers), result)
def __repr__(self):
return '<%s>' % (self._repr_helper())
def kill(self, *throw_args): def kill(self, *throw_args):
"""Raise ProcKilled exception (a subclass of GreenletExit) in this """Raise an exception in the greenlet. Unschedule the current greenlet
greenlet that will cause it to die. When this function returns, so that this Proc can handle the exception (or die).
the greenlet is usually dead, unless it catched GreenletExit.
The exception can be specified with throw_args. By default, ProcExit is
raised.
""" """
greenlet = self.greenlet if not self.dead:
if greenlet is not None and not self.ready:
if not throw_args: if not throw_args:
throw_args = (ProcKilled, ) throw_args = (ProcExit, )
return api.kill(greenlet, *throw_args) api.get_hub().schedule_call_global(0, self.greenlet.throw, *throw_args)
if api.getcurrent() is not api.get_hub().greenlet:
def link_return(self, listener=None, weak=None): api.sleep(0)
"""Establish a link between this Proc and `listener' (the current
greenlet by default), such that `listener' will receive a notification
when this Proc exits cleanly or killed with GreenletExit or a subclass.
Any previous link is discarded, so calling link_return and then
link_raise is not the same as calling link.
See `link' function for more details.
"""
if listener is None:
listener = api.getcurrent()
if listener is self:
raise ValueError("Linking to self is pointless")
if self._result is not _NOT_USED and self._exc is not None:
return
deliverer = _get_deliverer_for_value(listener, weak)
if self._result is not _NOT_USED:
deliverer.deliver_value(listener, self._result, self._kill_exc)
else:
self._receivers[listener] = deliverer
# add link_completed link_killed ?
def link_raise(self, listener=None, weak=None):
"""Establish a link between this Proc and `listener' (the current
greenlet by default), such that `listener' will receive a notification
when this Proc exits because of unhandled exception. Note, that
unhandled GreenletExit (or a subclass) is a special case and and will
not be re-raised. No link will be established if the Proc has already
exited cleanly or was killed.
Any previous link is discarded, so calling link_return and then
link_raise is not the same as calling link.
See `link' function for more details.
"""
if listener is None:
listener = api.getcurrent()
if listener is self:
raise ValueError("Linking to self is pointless")
if self._result is not _NOT_USED and self._exc is None:
return
deliverer = _get_deliverer_for_error(listener, weak)
if self._result is not _NOT_USED:
deliverer.deliver_error(listener, self._exc, self._kill_exc)
else:
self._receivers[listener] = deliverer
def link(self, listener=None, weak=None):
"""Establish a link between this Proc and `listener' (the current
greenlet by default), such that `listener' will receive a notification
when this Proc exits.
The can be only one link from this Proc to `listener'. A new link
discards a previous link if there was one. After the notification is
performed the link is no longer needed and is removed.
How a notification is delivered depends on the type of `listener':
1. If `listener' is an event or a queue or something else with
send/send_exception methods, these are used to deliver the result.
2. If `listener' is a Proc or a greenlet or something else with
throw method then it's used to raise a subclass of LinkedExited;
whichever subclass is used depends on how this Proc died.
3. If `listener' is a callable, it is called with one argument if this
greenlet exits cleanly or with 3 arguments (typ, val, tb) if this
greenlet dies because of an unhandled exception.
Note that the subclasses of GreenletExit are delivered as return values.
If `weak' is True, Proc stores the strong reference to the listener;
if `weak' is False, then a weakref is used and no new references to
the `listener' are created. Such link will disappear when `listener'
disappers.
if `weak' argument is not provided or is None then weak link is
created unless it's impossible to do so or `listener' is callable.
To ignore unhandled exceptions use `link_return' method. To receive only
the exception and not return values or GreenletExits use `link_raise' method.
Note, that GreenletExit is treated specially and is delivered as a value,
not as an exception (i.e. send method is used to deliver it and not
send_exception).
"""
if listener is None:
listener = api.getcurrent()
if listener is self:
raise ValueError("Linking to self is pointless")
deliverer = _get_deliverer_for_any(listener, weak)
if self._result is not _NOT_USED:
if self._exc is None:
deliverer.deliver_value(listener, self._result, self._kill_exc)
else:
deliverer.deliver_error(listener, self._exc, self._kill_exc)
else:
self._receivers[listener] = deliverer
# XXX check how many arguments listener accepts: for link must be one or 3
# for link_return must be 1, for link_raise must be 3, toherwise raise TypeError
def unlink(self, listener=None):
if listener is None:
listener = api.getcurrent()
self._receivers.pop(listener, None)
def __enter__(self):
self.link()
def __exit__(self, *args):
self.unlink()
# add send/send_exception here
def wait(self):
if self._result is _NOT_USED:
event = coros.event()
self.link(event)
return event.wait()
elif self._exc is None:
return self._result
else:
api.getcurrent().throw(*self._exc)
def poll(self, notready=None):
if self._result is not _NOT_USED:
if self._exc is None:
return self._result
else:
api.getcurrent().throw(*self._exc)
return notready
def _deliver_result(self):
while self._receivers:
listener, deliverer = self._receivers.popitem()
try:
deliverer.deliver_value(listener, self._result, self._kill_exc)
except:
# this greenlet has to die so that the error is logged by the hub
# spawn a new greenlet to finish the job
if self._receivers:
spawn(self._deliver_result)
raise
def _deliver_exception(self):
while self._receivers:
listener, deliverer = self._receivers.popitem()
try:
deliverer.deliver_error(listener, self._exc, self._kill_exc)
except:
# this greenlet has to die so that the exception will be logged
# the original exception is, however, lost
# spawn a new greenlet to finish the job
if self._receivers:
spawn_greenlet(self._deliver_exception)
raise
# XXX the following is not exactly object-oriented
# XXX add __deliver_error__ and __deliver_result__ methods to event, queue, Proc?
# would still need special cases for callback and greenlet
# QQQ add __call__ to event (and queue) such that it can be treated as callable by link()?
# QQQ add better yet, add send/send_exception to Proc
def argnum(func):
"""Return minimal and maximum number of args that func can accept
>>> (0, sys.maxint) == argnum(lambda *args: None)
True
>>> argnum(lambda x: None)
(1, 1)
>>> argnum(lambda x, y, z=5, a=6: None)
(2, 4)
"""
args, varargs, varkw, defaults = getargspec(func)
if varargs is not None:
return 0, sys.maxint
return len(args)-len(defaults or []), len(args)
def _get_deliverer_for_value(listener, weak):
if hasattr(listener, 'send'):
return _deliver_value_to_event(listener, weak)
elif hasattr(listener, 'greenlet_ref'):
return _deliver_value_to_proc(listener, weak)
elif hasattr(listener, 'throw'):
return _deliver_value_to_greenlet(listener, weak)
elif callable(listener):
min, max = argnum(listener)
if min <= 1 <= max:
return _deliver_value_to_callback(listener, weak)
raise TypeError('function must support one argument: %r' % listener)
else:
raise TypeError('Cannot link to %r' % (listener, ))
def _get_deliverer_for_error(listener, weak):
if hasattr(listener, 'send_exception'):
return _deliver_error_to_event(listener, weak)
elif hasattr(listener, 'greenlet_ref'):
return _deliver_error_to_proc(listener, weak)
elif hasattr(listener, 'throw'):
return _deliver_error_to_greenlet(listener, weak)
elif callable(listener):
min, max = argnum(listener)
if min <= 3 <= max:
return _deliver_error_to_callback(listener, weak)
raise TypeError('function must support three arguments: %r' % listener)
else:
raise TypeError('Cannot link to %r' % (listener, ))
def _get_deliverer_for_any(listener, weak):
if hasattr(listener, 'send') and hasattr(listener, 'send_exception'):
return _deliver_to_event(listener, weak)
elif hasattr(listener, 'greenlet_ref'):
return _deliver_to_proc(listener, weak)
elif hasattr(listener, 'throw'):
return _deliver_to_greenlet(listener, weak)
elif callable(listener):
min, max = argnum(listener)
if min <= 1 and 3 <= max:
return _deliver_to_callback(listener, weak)
raise TypeError('function must support one or three arguments: %r' % listener)
else:
raise TypeError('Cannot link to %r' % (listener, ))
noop = staticmethod(lambda *args: None)
class _base:
weak = True
def __new__(cls, listener, weak):
if weak is None:
weak = cls.weak
if weak:
return cls
return cls(listener)
def __init__(self, listener, weak):
assert not weak, 'for weak links just return the class object, no need for an instance'
self._hold_ref = listener
class _deliver_to_callback(_base):
weak = False
@staticmethod
def deliver_value(callback, value, _):
callback(value)
@staticmethod
def deliver_error(callback, throw_args, _):
callback(*throw_args)
class _deliver_value_to_callback(_deliver_to_callback):
deliver_error = noop
class _deliver_error_to_callback(_deliver_to_callback):
deliver_value = noop
class _deliver_to_event(_base):
@staticmethod
def deliver_value(event, value, _):
event.send(value)
@staticmethod
def deliver_error(event, throw_args, _):
event.send_exception(*throw_args)
class _deliver_value_to_event(_deliver_to_event):
deliver_error = noop
class _deliver_error_to_event(_deliver_to_event):
deliver_value = noop
def _deliver_kill_exc_to_greenlet(greenlet, _, kill_exc):
if greenlet is api.getcurrent():
raise kill_exc
elif greenlet is not None:
if greenlet.dead:
return
# if greenlet was not started, we still want to schedule throw
# BUG: if greenlet was unlinked must not throw
api.get_hub().schedule_call_global(0, greenlet.throw, kill_exc)
class _deliver_to_greenlet(_base):
deliver_value = staticmethod(_deliver_kill_exc_to_greenlet)
deliver_error = staticmethod(_deliver_kill_exc_to_greenlet)
class _deliver_value_to_greenlet(_deliver_to_greenlet):
deliver_error = noop
class _deliver_error_to_greenlet(_deliver_to_greenlet):
deliver_value = noop
def _deliver_kill_exc_to_proc(proc, _, kill_exc):
_deliver_kill_exc_to_greenlet(proc.greenlet, _, kill_exc)
class _deliver_to_proc(_base):
deliver_value = staticmethod(_deliver_kill_exc_to_proc)
deliver_error = staticmethod(_deliver_kill_exc_to_proc)
class _deliver_value_to_proc(_deliver_to_proc):
deliver_error = noop
class _deliver_error_to_proc(_deliver_to_proc):
deliver_value = noop
spawn = Proc.spawn spawn = Proc.spawn
@@ -573,14 +456,14 @@ def spawn_link(function, *args, **kwargs):
p.link() p.link()
return p return p
def spawn_link_return(function, *args, **kwargs): def spawn_link_value(function, *args, **kwargs):
p = spawn(function, *args, **kwargs) p = spawn(function, *args, **kwargs)
p.link_return() p.link_value()
return p return p
def spawn_link_raise(function, *args, **kwargs): def spawn_link_exception(function, *args, **kwargs):
p = spawn(function, *args, **kwargs) p = spawn(function, *args, **kwargs)
p.link_raise() p.link_exception()
return p return p
@@ -597,30 +480,6 @@ class Pool(object):
g.link(lambda *_args: self.semaphore.release()) g.link(lambda *_args: self.semaphore.release())
return g return g
# not fully supports all types of listeners
def forward(queue, listener, tag):
while True:
try:
result = queue.wait()
except Exception:
listener.send_exception(*sys.exc_info())
else:
listener.send((tag, result))
# class Supervisor(object):
# max_restarts=3
# max_restarts_period=30
#
# def __init__(self, max_restarts=None, max_restarts_period=None):
# if max_restarts is not None:
# self.max_restarts = max_restarts
# if max_restarts_period is not None:
# self.max_restarts_period = max_restarts_period
#
#def spawn_child(self, function, *args, **kwargs):
# def supervise(self, proc, max_restarts, max_restarts_period, restarts_delay):
if __name__=='__main__': if __name__=='__main__':
import doctest import doctest

View File

@@ -22,7 +22,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
from eventlet.api import get_hub from eventlet.api import get_hub, getcurrent
""" If true, captures a stack trace for each timer when constructed. This is """ If true, captures a stack trace for each timer when constructed. This is
useful for debugging leaking timers, to find out where the timer was set up. """ useful for debugging leaking timers, to find out where the timer was set up. """
@@ -40,7 +40,7 @@ class Timer(object):
This timer will not be run unless it is scheduled in a runloop by This timer will not be run unless it is scheduled in a runloop by
calling timer.schedule() or runloop.add_timer(timer). calling timer.schedule() or runloop.add_timer(timer).
""" """
self.cancelled = False self._cancelled = False
self.seconds = seconds self.seconds = seconds
self.tpl = cb, args, kw self.tpl = cb, args, kw
self.called = False self.called = False
@@ -49,6 +49,10 @@ class Timer(object):
self.traceback = cStringIO.StringIO() self.traceback = cStringIO.StringIO()
traceback.print_stack(file=self.traceback) traceback.print_stack(file=self.traceback)
@property
def cancelled(self):
return self._cancelled
def __repr__(self): def __repr__(self):
secs = getattr(self, 'seconds', None) secs = getattr(self, 'seconds', None)
cb, args, kw = getattr(self, 'tpl', (None, None, None)) cb, args, kw = getattr(self, 'tpl', (None, None, None))
@@ -82,10 +86,38 @@ class Timer(object):
"""Prevent this timer from being called. If the timer has already """Prevent this timer from being called. If the timer has already
been called, has no effect. been called, has no effect.
""" """
self.cancelled = True self._cancelled = True
self.called = True self.called = True
get_hub().timer_canceled(self) get_hub().timer_canceled(self)
try: try:
del self.tpl del self.tpl
except AttributeError: except AttributeError:
pass pass
class LocalTimer(Timer):
def __init__(self, *args, **kwargs):
self.greenlet = getcurrent()
Timer.__init__(self, *args, **kwargs)
@property
def cancelled(self):
if self.greenlet is None or self.greenlet.dead:
return True
return self._cancelled
def __call__(self, *args):
if not self.called:
self.called = True
if self.greenlet is not None and self.greenlet.dead:
return
cb, args, kw = self.tpl
try:
cb(*args, **kw)
finally:
get_hub().timer_finished(self)
def cancel(self):
self.greenlet = None
Timer.cancel(self)

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure from twisted.python import failure
from eventlet.support.greenlet import greenlet from eventlet.support.greenlet import greenlet

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Integrate eventlet with twisted's reactor mainloop. """Integrate eventlet with twisted's reactor mainloop.
You generally don't have to use it unless you need to call reactor.run() You generally don't have to use it unless you need to call reactor.run()

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Basic twisted protocols converted to synchronous mode""" """Basic twisted protocols converted to synchronous mode"""
import sys import sys
from twisted.internet.protocol import Protocol as twistedProtocol from twisted.internet.protocol import Protocol as twistedProtocol
@@ -166,16 +187,17 @@ class GreenTransport(GreenTransportBase):
if self._queue is not None: if self._queue is not None:
resumed = False resumed = False
try: try:
while len(self._buffer) < size or size < 0: try:
if not resumed: while len(self._buffer) < size or size < 0:
self.resumeProducing() if not resumed:
resumed = True self.resumeProducing()
self._buffer += self._wait() resumed = True
except ConnectionDone: self._buffer += self._wait()
self._queue = None except ConnectionDone:
except: self._queue = None
self._queue = None except:
self._error = sys.exc_info() self._queue = None
self._error = sys.exc_info()
finally: finally:
if resumed: if resumed:
self.pauseProducing() self.pauseProducing()
@@ -193,14 +215,15 @@ class GreenTransport(GreenTransportBase):
if self._queue is not None and not self._buffer: if self._queue is not None and not self._buffer:
self.resumeProducing() self.resumeProducing()
try: try:
recvd = self._wait() try:
#print 'received %r' % recvd recvd = self._wait()
self._buffer += recvd #print 'received %r' % recvd
except ConnectionDone: self._buffer += recvd
self._queue = None except ConnectionDone:
except: self._queue = None
self._queue = None except:
self._error = sys.exc_info() self._queue = None
self._error = sys.exc_info()
finally: finally:
self.pauseProducing() self.pauseProducing()
if buflen is None: if buflen is None:

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from twisted.protocols import basic from twisted.protocols import basic
from twisted.internet.error import ConnectionDone from twisted.internet.error import ConnectionDone
from eventlet.twistedutil.protocol import GreenTransportBase from eventlet.twistedutil.protocol import GreenTransportBase

View File

@@ -1,27 +1,52 @@
"""Spawn multiple greenlet-workers and collect their results. # Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
Demonstrates how to use eventlet.green package and coros.Job. """Spawn multiple workers and collect their results.
Demonstrates how to use eventlet.green package and proc module.
""" """
from eventlet import proc
from eventlet.green import socket from eventlet.green import socket
from eventlet.coros import Job
# this example works with both standard eventlet hubs and with twisted-based hub # this example works with both standard eventlet hubs and with twisted-based hub
# comment out the following line to use standard eventlet hub # uncomment the following line to use twisted hub
from twisted.internet import reactor #from twisted.internet import reactor
def geturl(url): def geturl(url):
c = socket.socket() c = socket.socket()
ip = socket.gethostbyname(url) ip = socket.gethostbyname(url)
c.connect((ip, 80)) c.connect((ip, 80))
print '%s connected' % url
c.send('GET /\r\n\r\n') c.send('GET /\r\n\r\n')
return c.recv(1024) return c.recv(1024)
urls = ['www.google.com', 'www.yandex.ru', 'www.python.org'] urls = ['www.google.com', 'www.yandex.ru', 'www.python.org']
jobs = [Job.spawn_new(geturl, x) for x in urls] jobs = [proc.spawn(geturl, x) for x in urls]
print 'spawned %s jobs' % len(jobs) print 'spawned %s jobs' % len(jobs)
# collect the results from workers, one by one # collect the results from workers
for url, job in zip(urls, jobs): results = proc.waitall(jobs)
print '%s: %s' % (url, repr(job.wait())[:50]) # Note, that any exception in the workers will be reraised by waitall
# unless trap_errors argument specifies otherwise
for url, result in zip(urls, results):
print '%s: %s' % (url, repr(result)[:50])

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Listen on port 8888 and pretend to be an HTTP proxy. """Listen on port 8888 and pretend to be an HTTP proxy.
It even works for some pages. It even works for some pages.
@@ -63,5 +84,6 @@ def format_response(response):
class MyFactory(Factory): class MyFactory(Factory):
protocol = LineOnlyReceiver protocol = LineOnlyReceiver
print __doc__
reactor.listenTCP(8888, MyFactory()) reactor.listenTCP(8888, MyFactory())
reactor.run() reactor.run()

View File

@@ -1,11 +1,32 @@
#!/usr/bin/python #!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Port forwarder """Port forwarder
USAGE: twisted_portforward.py local_port remote_host remote_port""" USAGE: twisted_portforward.py local_port remote_host remote_port"""
import sys import sys
from twisted.internet import reactor from twisted.internet import reactor
from eventlet.coros import Job
from eventlet.twistedutil import join_reactor from eventlet.twistedutil import join_reactor
from eventlet.twistedutil.protocol import GreenClientCreator, SpawnFactory, UnbufferedTransport from eventlet.twistedutil.protocol import GreenClientCreator, SpawnFactory, UnbufferedTransport
from eventlet import proc
def forward(source, dest): def forward(source, dest):
try: try:
@@ -22,10 +43,9 @@ def handler(local):
client = str(local.getHost()) client = str(local.getHost())
print 'accepted connection from %s' % client print 'accepted connection from %s' % client
remote = GreenClientCreator(reactor, UnbufferedTransport).connectTCP(remote_host, remote_port) remote = GreenClientCreator(reactor, UnbufferedTransport).connectTCP(remote_host, remote_port)
a = Job.spawn_new(forward, remote, local) a = proc.spawn(forward, remote, local)
b = Job.spawn_new(forward, local, remote) b = proc.spawn(forward, local, remote)
a.wait() proc.waitall([a, b], trap_errors=True)
b.wait()
print 'closed connection to %s' % client print 'closed connection to %s' % client
try: try:

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Simple chat demo application. """Simple chat demo application.
Listen on port 8007 and re-send all the data received to other participants. Listen on port 8007 and re-send all the data received to other participants.
@@ -33,6 +54,7 @@ class Chat:
finally: finally:
self.participants.remove(conn) self.participants.remove(conn)
print __doc__
chat = Chat() chat = Chat()
from twisted.internet import reactor from twisted.internet import reactor
reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverTransport)) reactor.listenTCP(8007, SpawnFactory(chat.handler, LineOnlyReceiverTransport))

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from twisted.internet import reactor from twisted.internet import reactor
from twisted.names.srvconnect import SRVConnector from twisted.names.srvconnect import SRVConnector
from gnutls.interfaces.twisted import X509Credentials from gnutls.interfaces.twisted import X509Credentials
@@ -8,21 +29,16 @@ from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
class NoisySRVConnector(SRVConnector): class NoisySRVConnector(SRVConnector):
def _ebGotServers(self, failure): def _ebGotServers(self, failure):
#self.failure = failure
return SRVConnector._ebGotServers(self, failure) return SRVConnector._ebGotServers(self, failure)
def pickServer(self): def pickServer(self):
host, port = SRVConnector.pickServer(self) host, port = SRVConnector.pickServer(self)
#if not isinstance(port, int) and self.failure:
# self.failure.raiseException()
print 'Resolved _%s._%s.%s --> %s:%s' % (self.service, self.protocol, self.domain, host, port) print 'Resolved _%s._%s.%s --> %s:%s' % (self.service, self.protocol, self.domain, host, port)
return host, port return host, port
# why TypeError is not raised here?
cred = X509Credentials(None, None) cred = X509Credentials(None, None)
creator = GreenClientCreator(reactor, LineOnlyReceiverTransport) creator = GreenClientCreator(reactor, LineOnlyReceiverTransport)
conn = creator.connectSRV('msrpsx', 'ag-projects.com', conn = creator.connectSRV('msrps', 'ag-projects.com',
connectFuncName='connectTLS', connectFuncArgs=(cred,), connectFuncName='connectTLS', connectFuncArgs=(cred,),
ConnectorClass=NoisySRVConnector) ConnectorClass=NoisySRVConnector)

View File

@@ -1,5 +1,27 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# package is named greentest, not test, so it won't be confused with test in stdlib # package is named greentest, not test, so it won't be confused with test in stdlib
import sys import sys
import unittest
disabled_marker = '-*-*-*-*-*- disabled -*-*-*-*-*-' disabled_marker = '-*-*-*-*-*- disabled -*-*-*-*-*-'
def exit_disabled(): def exit_disabled():
@@ -10,4 +32,18 @@ def exit_unless_twisted():
if 'Twisted' not in type(get_hub()).__name__: if 'Twisted' not in type(get_hub()).__name__:
exit_disabled() exit_disabled()
def exit_unless_25():
print sys.version_info[:2]<(2, 5)
if sys.version_info[:2]<(2, 5):
exit_disabled()
class LimitedTestCase(unittest.TestCase):
def setUp(self):
from eventlet import api
self.timer = api.exc_after(1, RuntimeError('test is taking too long'))
def tearDown(self):
self.timer.cancel()

View File

@@ -65,15 +65,18 @@ class TestEvent(tests.TestCase):
self.assertEqual(len(results), count) self.assertEqual(len(results), count)
def test_cancel(self): # commented out, not fixed because it's unclear what event.cancel(waiter) should do
evt = coros.event() # (docstring and the code say different things) and because cancel() as implemented now
# close over the current coro so we can cancel it explicitly # has a bug
current = api.getcurrent() # def test_cancel(self):
def cancel_event(): # evt = coros.event()
evt.cancel(current) # # close over the current coro so we can cancel it explicitly
api.spawn(cancel_event) # current = api.getcurrent()
# def cancel_event():
self.assertRaises(coros.Cancelled, evt.wait) # evt.cancel(current)
# api.spawn(cancel_event)
#
# self.assertRaises(coros.Cancelled, evt.wait)
def test_reset(self): def test_reset(self):
evt = coros.event() evt = coros.event()
@@ -154,16 +157,17 @@ class TestCoroutinePool(tests.TestCase):
done.wait() done.wait()
self.assertEquals(['cons1', 'prod', 'cons2'], results) self.assertEquals(['cons1', 'prod', 'cons2'], results)
def test_timer_cancel(self): # since CoroutinePool does not kill the greenlet, the following does not work
def some_work(): # def test_timer_cancel(self):
t = timer.Timer(5, lambda: None) # def some_work():
t.schedule() # t = timer.LocalTimer(5, lambda: None)
return t # t.schedule()
pool = coros.CoroutinePool(0, 2) # return t
worker = pool.execute(some_work) # pool = coros.CoroutinePool(0, 2)
t = worker.wait() # worker = pool.execute(some_work)
api.sleep(0) # t = worker.wait()
self.assertEquals(t.cancelled, True) # api.sleep(0)
# self.assertEquals(t.cancelled, True)
def test_reentrant(self): def test_reentrant(self):
pool = coros.CoroutinePool(0,1) pool = coros.CoroutinePool(0,1)

View File

@@ -1,4 +1,25 @@
#!/usr/bin/python #!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys import sys
import os import os
import sqlite3 import sqlite3
@@ -41,17 +62,17 @@ def calc_hub_stats(table):
class TestResult: class TestResult:
def __init__(self, runs, errors, fails, timeouts, exitcode=None, id=None, output=None): def __init__(self, runs, errors, fails, timeouts, exitcode=None, id=None, output=None):
self.runs = runs self.runs = max(runs, 0)
self.errors = errors self.errors = max(errors, 0)
self.fails = fails self.fails = max(fails, 0)
self.timeouts = timeouts self.timeouts = max(timeouts, 0)
self.exitcode = exitcode self.exitcode = exitcode
self.id = id self.id = id
self.output = output self.output = output
@property @property
def passed(self): def passed(self):
return self.runs - self.errors - self.fails return max(0, self.runs - self.errors - self.fails)
@property @property
def failed(self): def failed(self):

View File

@@ -1,4 +1,25 @@
#!/usr/bin/python #!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys import sys
import traceback import traceback
import sqlite3 import sqlite3
@@ -7,7 +28,7 @@ import glob
def parse_stdout(s): def parse_stdout(s):
argv = re.search('^===ARGV=(.*?)$', s, re.M).group(1) argv = re.search('^===ARGV=(.*?)$', s, re.M).group(1)
argv = eval(argv) argv = argv.split()
testname = argv[-1] testname = argv[-1]
del argv[-1] del argv[-1]
hub = None hub = None
@@ -86,6 +107,7 @@ def main(db):
except Exception: except Exception:
parse_error += 1 parse_error += 1
sys.stderr.write('Failed to parse id=%s\n' % id) sys.stderr.write('Failed to parse id=%s\n' % id)
print repr(stdout)
traceback.print_exc() traceback.print_exc()
else: else:
print id, hub, testname, runs, errors, fails, timeouts print id, hub, testname, runs, errors, fails, timeouts

View File

@@ -1,11 +1,35 @@
#!/usr/bin/python #!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Run the program and record stdout/stderr/exitcode into the database results.rev_changeset.db """Run the program and record stdout/stderr/exitcode into the database results.rev_changeset.db
Usage: %prog program [args] Usage: %prog program [args]
""" """
import sys import sys
import os import os
import sqlite3 try:
import sqlite3
except ImportError:
import pysqlite2.dbapi2 as sqlite3
import warnings import warnings
from greentest import disabled_marker from greentest import disabled_marker

View File

@@ -1,4 +1,25 @@
#!/usr/bin/python #!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Run tests for different configurations (hub/reactor)""" """Run tests for different configurations (hub/reactor)"""
import sys import sys
import os import os
@@ -12,7 +33,7 @@ from with_eventlet import import_reactor
first_hubs = ['selecthub', 'poll', 'selects', 'twistedr'] first_hubs = ['selecthub', 'poll', 'selects', 'twistedr']
first_reactors = ['selectreactor', 'pollreactor', 'epollreactor'] first_reactors = ['selectreactor', 'pollreactor', 'epollreactor']
COMMAND = './record_results.py ./with_timeout.py ./with_eventlet.py %(setup)s %(test)s' COMMAND = './record_results.py ' + sys.executable + ' ./with_timeout.py ./with_eventlet.py %(setup)s %(test)s'
PARSE_PERIOD = 10 PARSE_PERIOD = 10
# the following aren't in the default list unless --all option present # the following aren't in the default list unless --all option present

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest import unittest
from eventlet.api import sleep, spawn, kill, with_timeout, TimeoutError from eventlet.api import sleep, spawn, kill, with_timeout, TimeoutError

View File

@@ -1,4 +1,26 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import with_statement from __future__ import with_statement
import sys
import unittest import unittest
import weakref import weakref
import time import time

View File

@@ -1,8 +1,29 @@
from __future__ import with_statement # Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest import unittest
from eventlet import api, coros from eventlet import api, coros
from greentest import LimitedTestCase
class TestSemaphore(unittest.TestCase): class TestSemaphore(LimitedTestCase):
def test_bounded(self): def test_bounded(self):
# this was originally semaphore's doctest # this was originally semaphore's doctest
@@ -22,8 +43,7 @@ class TestSemaphore(unittest.TestCase):
def test_bounded_with_zero_limit(self): def test_bounded_with_zero_limit(self):
sem = coros.semaphore(0, 0) sem = coros.semaphore(0, 0)
api.spawn(sem.acquire) api.spawn(sem.acquire)
with api.timeout(0.001): sem.release()
sem.release()
if __name__=='__main__': if __name__=='__main__':

View File

@@ -1,12 +1,32 @@
from __future__ import with_statement # Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest import unittest
import sys from eventlet.coros import event
from eventlet.coros import event, Job, JobGroup from eventlet.api import spawn, sleep, exc_after, with_timeout
from eventlet.api import spawn, sleep, GreenletExit, exc_after, timeout from greentest import LimitedTestCase
DELAY= 0.01 DELAY= 0.01
class TestEvent(unittest.TestCase): class TestEvent(LimitedTestCase):
def test_send_exc(self): def test_send_exc(self):
log = [] log = []
@@ -17,12 +37,13 @@ class TestEvent(unittest.TestCase):
result = e.wait() result = e.wait()
log.append(('received', result)) log.append(('received', result))
except Exception, ex: except Exception, ex:
log.append(('catched', type(ex).__name__)) log.append(('catched', ex))
spawn(waiter) spawn(waiter)
sleep(0) # let waiter to block on e.wait() sleep(0) # let waiter to block on e.wait()
e.send(exc=Exception()) obj = Exception()
e.send(exc=obj)
sleep(0) sleep(0)
assert log == [('catched', 'Exception')], log assert log == [('catched', obj)], log
def test_send(self): def test_send(self):
event1 = event() event1 = event()
@@ -33,138 +54,10 @@ class TestEvent(unittest.TestCase):
try: try:
result = event1.wait() result = event1.wait()
except ValueError: except ValueError:
with timeout(DELAY, None): X = object()
result = event2.wait() result = with_timeout(DELAY, event2.wait, timeout_value=X)
raise AssertionError('Nobody sent anything to event2 yet it received %r' % (result, )) assert result is X, 'Nobody sent anything to event2 yet it received %r' % (result, )
class CommonJobTests:
def test_simple_return(self):
res = self.Job.spawn_new(lambda: 25).wait()
assert res==25, res
def test_exception(self):
try:
self.Job.spawn_new(sys.exit, 'bye').wait()
except SystemExit, ex:
assert ex.args == ('bye', )
else:
assert False, "Shouldn't get there"
def _test_kill(self, sync):
def func():
sleep(DELAY)
return 101
res = self.Job.spawn_new(func)
assert res
if sync:
res.kill()
else:
spawn(res.kill)
wait_result = res.wait()
assert not res, repr(res)
assert isinstance(wait_result, GreenletExit), repr(wait_result)
def test_kill_sync(self):
return self._test_kill(True)
def test_kill_async(self):
return self._test_kill(False)
def test_poll(self):
def func():
sleep(DELAY)
return 25
job = self.Job.spawn_new(func)
self.assertEqual(job.poll(), None)
assert job, repr(job)
self.assertEqual(job.wait(), 25)
self.assertEqual(job.poll(), 25)
assert not job, repr(job)
job = self.Job.spawn_new(func)
self.assertEqual(job.poll(5), 5)
assert job, repr(job)
self.assertEqual(job.wait(), 25)
self.assertEqual(job.poll(5), 25)
assert not job, repr(job)
def test_kill_after(self):
def func():
sleep(DELAY)
return 25
job = self.Job.spawn_new(func)
job.kill_after(DELAY/2)
result = job.wait()
assert isinstance(result, GreenletExit), repr(result)
job = self.Job.spawn_new(func)
job.kill_after(DELAY*2)
self.assertEqual(job.wait(), 25)
sleep(DELAY*2)
self.assertEqual(job.wait(), 25)
class TestJob(CommonJobTests, unittest.TestCase):
def setUp(self):
self.Job = Job
class TestJobGroup(CommonJobTests, unittest.TestCase):
def setUp(self):
self.Job = JobGroup()
def tearDown(self):
del self.Job
def check_raises_badint(self, wait):
try:
wait()
except ValueError, ex:
assert 'badint' in str(ex), str(ex)
else:
raise AssertionError('must raise ValueError')
def check_killed(self, wait, text=''):
result = wait()
assert isinstance(result, GreenletExit), repr(result)
assert str(result) == text, str(result)
def test_group_error(self):
x = self.Job.spawn_new(int, 'badint')
y = self.Job.spawn_new(sleep, DELAY)
self.check_killed(y.wait, 'Killed because of ValueError in the group')
self.check_raises_badint(x.wait)
z = self.Job.spawn_new(sleep, DELAY)
self.check_killed(z.wait, 'Killed because of ValueError in the group')
def test_wait_all(self):
x = self.Job.spawn_new(lambda : 1)
y = self.Job.spawn_new(lambda : 2)
z = self.Job.spawn_new(lambda : 3)
assert self.Job.wait_all() == [1, 2, 3], repr(self.Job.wait_all())
assert [x.wait(), y.wait(), z.wait()] == [1, 2, 3], [x.wait(), y.wait(), z.wait()]
def test_error_wait_all(self):
def x():
sleep(DELAY)
return 1
# x will be killed
x = self.Job.spawn_new(x)
# y will raise ValueError
y = self.Job.spawn_new(int, 'badint')
# z cannot be killed because it does not yield. it will finish successfully
z = self.Job.spawn_new(lambda : 3)
self.check_raises_badint(self.Job.wait_all)
self.check_killed(x.poll, 'Killed because of ValueError in the group')
self.check_killed(x.wait, 'Killed because of ValueError in the group')
assert z.wait() == 3, repr(z.wait())
self.check_raises_badint(y.wait)
# zz won't be even started, because there's already an error in the group
zz = self.Job.spawn_new(lambda : 4)
self.check_killed(x.poll, 'Killed because of ValueError in the group')
self.check_killed(x.wait, 'Killed because of ValueError in the group')
if __name__=='__main__': if __name__=='__main__':
unittest.main() unittest.main()

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Test than modules in eventlet.green package are indeed green. """Test than modules in eventlet.green package are indeed green.
To do that spawn a green server and then access it using a green socket. To do that spawn a green server and then access it using a green socket.
If either operation blocked the whole script would block and timeout. If either operation blocked the whole script would block and timeout.

43
greentest/test__hub.py Normal file
View File

@@ -0,0 +1,43 @@
# Copyright (c) 2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest
from eventlet import api
DELAY = 0.01
class TestScheduleCall(unittest.TestCase):
def test_local(self):
lst = [1]
api.spawn(api.get_hub().schedule_call_local, DELAY, lst.pop)
api.sleep(DELAY*2)
assert lst == [1], lst
def test_global(self):
lst = [1]
api.spawn(api.get_hub().schedule_call_global, DELAY, lst.pop)
api.sleep(DELAY*2)
assert lst == [], lst
if __name__=='__main__':
unittest.main()

View File

@@ -1,18 +1,104 @@
from __future__ import with_statement # Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys import sys
from twisted.internet import reactor
import unittest import unittest
from eventlet.api import sleep, timeout from eventlet.api import sleep, with_timeout
from eventlet import proc, coros from eventlet import api, proc, coros
from greentest import LimitedTestCase
DELAY= 0.001 DELAY = 0.01
class TestCase(unittest.TestCase): class TestEventSource(LimitedTestCase):
def test_send(self):
s = proc.Source()
q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
s.link_value(q1)
assert s.wait(0) is None
assert s.wait(0.001, None) is None
s.send(1)
assert not q1.ready()
assert s.wait()==1
api.sleep(0)
assert q1.ready()
s.link_exception(q2)
s.link(q3)
assert not q2.ready()
api.sleep(0)
assert q3.ready()
assert s.wait()==1
def test_send_exception(self):
s = proc.Source()
q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
s.link_exception(q1)
s.send_exception(OSError('hello'))
api.sleep(0)
assert q1.ready()
s.link_value(q2)
s.link(q3)
assert not q2.ready()
api.sleep(0)
assert q3.ready()
self.assertRaises(OSError, q1.wait)
self.assertRaises(OSError, q3.wait)
self.assertRaises(OSError, s.wait)
class SimpleTestProc(LimitedTestCase):
def test_proc(self):
p = proc.spawn(lambda : 100)
receiver = proc.spawn(api.sleep, 1)
p.link(receiver)
self.assertRaises(proc.LinkedCompleted, receiver.wait)
receiver2 = proc.spawn(api.sleep, 1)
p.link(receiver2)
self.assertRaises(proc.LinkedCompleted, receiver2.wait)
def test_event(self):
p = proc.spawn(lambda : 100)
event = coros.event()
p.link(event)
self.assertEqual(event.wait(), 100)
for i in xrange(3):
event2 = coros.event()
p.link(event2)
self.assertEqual(event2.wait(), 100)
def test_current(self):
p = proc.spawn(lambda : 100)
p.link()
self.assertRaises(proc.LinkedCompleted, sleep, 0.1)
class TestCase(LimitedTestCase):
def link(self, p, listener=None): def link(self, p, listener=None):
getattr(p, self.link_method)(listener) getattr(p, self.link_method)(listener)
def tearDown(self): def tearDown(self):
LimitedTestCase.tearDown(self)
self.p.unlink() self.p.unlink()
def set_links(self, p, first_time, kill_exc_type): def set_links(self, p, first_time, kill_exc_type):
@@ -31,6 +117,7 @@ class TestCase(unittest.TestCase):
try: try:
self.link(p) self.link(p)
api.sleep(0)
except kill_exc_type: except kill_exc_type:
if first_time: if first_time:
raise raise
@@ -63,47 +150,33 @@ class TestCase(unittest.TestCase):
return event, myproc, proc_finished_flag, queue return event, myproc, proc_finished_flag, queue
def check_timed_out(self, event, myproc, proc_finished_flag, queue): def check_timed_out(self, event, myproc, proc_finished_flag, queue):
with timeout(DELAY, None): X = object()
event.wait() assert with_timeout(DELAY, event.wait, timeout_value=X) is X
raise AssertionError('should not get there') assert with_timeout(DELAY, queue.wait, timeout_value=X) is X
assert with_timeout(DELAY, proc.waitall, [myproc], timeout_value=X) is X
with timeout(DELAY, None):
queue.wait()
raise AssertionError('should not get there')
with timeout(DELAY, None):
print repr(proc.wait(myproc))
raise AssertionError('should not get there')
assert proc_finished_flag == [], proc_finished_flag assert proc_finished_flag == [], proc_finished_flag
class TestReturn_link(TestCase): class TestReturn_link(TestCase):
link_method = 'link' link_method = 'link'
def test_kill(self):
p = self.p = proc.spawn(sleep, DELAY)
self._test_return(p, True, proc.ProcKilled, proc.LinkedKilled, p.kill)
# repeating the same with dead process
for _ in xrange(3):
self._test_return(p, False, proc.ProcKilled, proc.LinkedKilled, p.kill)
def test_return(self): def test_return(self):
p = self.p = proc.spawn(lambda : 25) def return25():
self._test_return(p, True, int, proc.LinkedCompleted, lambda : sleep(0)) return 25
p = self.p = proc.spawn(return25)
self._test_return(p, True, 25, proc.LinkedCompleted, lambda : sleep(0))
# repeating the same with dead process # repeating the same with dead process
for _ in xrange(3): for _ in xrange(3):
self._test_return(p, False, int, proc.LinkedCompleted, lambda : sleep(0)) self._test_return(p, False, 25, proc.LinkedCompleted, lambda : sleep(0))
def _test_return(self, p, first_time, result_type, kill_exc_type, action): def _test_return(self, p, first_time, result, kill_exc_type, action):
event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
# stuff that will time out because there's no unhandled exception: # stuff that will time out because there's no unhandled exception:
#link_raise_event, link_raise_receiver, link_raise_flag, link_raise_queue = self.set_links_timeout(p.link_raise) xxxxx = self.set_links_timeout(p.link_exception)
xxxxx = self.set_links_timeout(p.link_raise)
action()
try: try:
sleep(DELAY) sleep(DELAY*2)
except kill_exc_type: except kill_exc_type:
assert first_time, 'raising here only first time' assert first_time, 'raising here only first time'
else: else:
@@ -111,30 +184,28 @@ class TestReturn_link(TestCase):
assert not p, p assert not p, p
with timeout(DELAY): self.assertEqual(event.wait(), result)
event_result = event.wait() self.assertEqual(queue.wait(), result)
queue_result = queue.wait() self.assertRaises(kill_exc_type, receiver.wait)
proc_result = proc.wait(receiver) self.assertRaises(kill_exc_type, proc.waitall, [receiver])
assert isinstance(event_result, result_type), repr(event_result)
assert isinstance(proc_result, kill_exc_type), repr(proc_result)
sleep(DELAY) sleep(DELAY)
assert not proc_flag, proc_flag assert not proc_flag, proc_flag
assert not callback_flag, callback_flag assert not callback_flag, callback_flag
self.check_timed_out(*xxxxx) self.check_timed_out(*xxxxx)
class TestReturn_link_return(TestReturn_link): class TestReturn_link_value(TestReturn_link):
sync = False sync = False
link_method = 'link_return' link_method = 'link_value'
class TestRaise_link(TestCase): class TestRaise_link(TestCase):
link_method = 'link' link_method = 'link'
def _test_raise(self, p, first_time, kill_exc_type=proc.LinkedFailed): def _test_raise(self, p, first_time, kill_exc_type):
event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
xxxxx = self.set_links_timeout(p.link_return) xxxxx = self.set_links_timeout(p.link_value)
try: try:
sleep(DELAY) sleep(DELAY)
@@ -145,12 +216,9 @@ class TestRaise_link(TestCase):
assert not p, p assert not p, p
with timeout(DELAY): self.assertRaises(ValueError, event.wait)
self.assertRaises(ValueError, event.wait) self.assertRaises(ValueError, queue.wait)
self.assertRaises(ValueError, queue.wait) self.assertRaises(kill_exc_type, proc.waitall, [receiver])
proc_result = proc.wait(receiver)
assert isinstance(proc_result, kill_exc_type), repr(proc_result)
sleep(DELAY) sleep(DELAY)
assert not proc_flag, proc_flag assert not proc_flag, proc_flag
assert not callback_flag, callback_flag assert not callback_flag, callback_flag
@@ -159,13 +227,44 @@ class TestRaise_link(TestCase):
def test_raise(self): def test_raise(self):
p = self.p = proc.spawn(int, 'badint') p = self.p = proc.spawn(int, 'badint')
self._test_raise(p, True) self._test_raise(p, True, proc.LinkedFailed)
# repeating the same with dead process # repeating the same with dead process
for _ in xrange(3): for _ in xrange(3):
self._test_raise(p, False) self._test_raise(p, False, proc.LinkedFailed)
class TestRaise_link_raise(TestCase): def _test_kill(self, p, first_time, kill_exc_type):
link_method = 'link_raise' event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
xxxxx = self.set_links_timeout(p.link_value)
p.kill()
try:
sleep(DELAY)
except kill_exc_type:
assert first_time, 'raising here only first time'
else:
assert not first_time, 'Should not raise LinkedKilled here after first time'
assert not p, p
self.assertRaises(proc.ProcExit, event.wait)
self.assertRaises(proc.ProcExit, queue.wait)
self.assertRaises(kill_exc_type, proc.waitall, [receiver])
sleep(DELAY)
assert not proc_flag, proc_flag
assert not callback_flag, callback_flag
self.check_timed_out(*xxxxx)
def test_kill(self):
p = self.p = proc.spawn(sleep, DELAY)
self._test_kill(p, True, proc.LinkedKilled)
# repeating the same with dead process
for _ in xrange(3):
self._test_kill(p, False, proc.LinkedKilled)
class TestRaise_link_exception(TestCase):
link_method = 'link_exception'
class TestStuff(unittest.TestCase): class TestStuff(unittest.TestCase):
@@ -174,8 +273,15 @@ class TestStuff(unittest.TestCase):
x = proc.spawn(lambda : 1) x = proc.spawn(lambda : 1)
y = proc.spawn(lambda : 2) y = proc.spawn(lambda : 2)
z = proc.spawn(lambda : 3) z = proc.spawn(lambda : 3)
self.assertEqual(proc.wait([x, y, z]), [1, 2, 3]) self.assertEqual(proc.waitall([x, y, z]), [1, 2, 3])
self.assertEqual([proc.wait(X) for X in [x, y, z]], [1, 2, 3]) e = coros.event()
x.link(e)
self.assertEqual(e.wait(), 1)
x.unlink(e)
e = coros.event()
x.link(e)
self.assertEqual(e.wait(), 1)
self.assertEqual([proc.waitall([X]) for X in [x, y, z]], [[1], [2], [3]])
def test_wait_error(self): def test_wait_error(self):
def x(): def x():
@@ -188,10 +294,10 @@ class TestStuff(unittest.TestCase):
x.link(y) x.link(y)
y.link(z) y.link(z)
z.link(y) z.link(y)
self.assertRaises(ValueError, proc.wait, [x, y, z]) self.assertRaises(ValueError, proc.waitall, [x, y, z])
assert isinstance(proc.wait(x), proc.LinkedFailed), repr(proc.wait(x)) self.assertRaises(proc.LinkedFailed, proc.waitall, [x])
self.assertEqual(proc.wait(z), 3) self.assertEqual(proc.waitall([z]), [3])
self.assertRaises(ValueError, proc.wait, y) self.assertRaises(ValueError, proc.waitall, [y])
def test_wait_all_exception_order(self): def test_wait_all_exception_order(self):
# if there're several exceptions raised, the earliest one must be raised by wait # if there're several exceptions raised, the earliest one must be raised by wait
@@ -201,14 +307,15 @@ class TestStuff(unittest.TestCase):
a = proc.spawn(badint) a = proc.spawn(badint)
b = proc.spawn(int, 'second') b = proc.spawn(int, 'second')
try: try:
proc.wait([a, b]) proc.waitall([a, b])
except ValueError, ex: except ValueError, ex:
assert 'second' in str(ex), repr(str(ex)) assert 'second' in str(ex), repr(str(ex))
def test_multiple_listeners_error(self): def test_multiple_listeners_error(self):
# if there was an error while calling a callback # if there was an error while calling a callback
# it should not prevent the other listeners from being called # it should not prevent the other listeners from being called
# (but all of them should be logged, check the output that they are) # also, all of the errors should be logged, check the output
# manually that they are
p = proc.spawn(lambda : 5) p = proc.spawn(lambda : 5)
results = [] results = []
def listener1(*args): def listener1(*args):
@@ -222,7 +329,7 @@ class TestStuff(unittest.TestCase):
p.link(listener1) p.link(listener1)
p.link(listener2) p.link(listener2)
p.link(listener3) p.link(listener3)
sleep(DELAY*3) sleep(DELAY*10)
assert results in [[10, 20], [20, 10]], results assert results in [[10, 20], [20, 10]], results
p = proc.spawn(int, 'hello') p = proc.spawn(int, 'hello')
@@ -230,16 +337,20 @@ class TestStuff(unittest.TestCase):
p.link(listener1) p.link(listener1)
p.link(listener2) p.link(listener2)
p.link(listener3) p.link(listener3)
sleep(DELAY*3) sleep(DELAY*10)
assert results in [[10, 20], [20, 10]], results assert results in [[10, 20], [20, 10]], results
def test_multiple_listeners_error_unlink(self): def test_multiple_listeners_error_unlink(self):
# notification must not happen after unlink even
# though notification process has been already started
p = proc.spawn(lambda : 5) p = proc.spawn(lambda : 5)
results = [] results = []
def listener1(*args): def listener1(*args):
p.unlink(listener2)
results.append(5) results.append(5)
1/0 1/0
def listener2(*args): def listener2(*args):
p.unlink(listener1)
results.append(5) results.append(5)
2/0 2/0
def listener3(*args): def listener3(*args):
@@ -247,16 +358,10 @@ class TestStuff(unittest.TestCase):
p.link(listener1) p.link(listener1)
p.link(listener2) p.link(listener2)
p.link(listener3) p.link(listener3)
sleep(0) sleep(DELAY*10)
# unlink one that is not fired yet
if listener1 in p._receivers:
p.unlink(listener1)
elif listener2 in p._receivers:
p.unlink(listener2)
sleep(DELAY*3)
assert results == [5], results assert results == [5], results
def FAILING_test_killing_unlinked(self): def test_killing_unlinked(self):
e = coros.event() e = coros.event()
def func(): def func():
try: try:
@@ -265,39 +370,14 @@ class TestStuff(unittest.TestCase):
e.send_exception(*sys.exc_info()) e.send_exception(*sys.exc_info())
p = proc.spawn_link(func) p = proc.spawn_link(func)
try: try:
e.wait() try:
except ZeroDivisionError: e.wait()
pass except ZeroDivisionError:
pass
finally: finally:
p.unlink() p.unlink()
sleep(DELAY) sleep(DELAY)
funcs_only_1arg = [lambda x: None,
lambda x=1: None]
funcs_only_3args = [lambda x, y, z: None,
lambda x, y, z=1: None]
funcs_any_arg = [lambda a, b=1, c=1: None,
lambda *args: None]
class TestCallbackTypeErrors(unittest.TestCase):
def test(self):
p = proc.spawn(lambda : None)
for func in funcs_only_1arg:
p.link_return(func)
self.assertRaises(TypeError, p.link_raise, func)
self.assertRaises(TypeError, p.link, func)
for func in funcs_only_3args:
p.link_raise(func)
self.assertRaises(TypeError, p.link_return, func)
self.assertRaises(TypeError, p.link, func)
for func in funcs_any_arg:
p.link_raise(func)
p.link_return(func)
p.link(func)
if __name__=='__main__': if __name__=='__main__':
unittest.main() unittest.main()

View File

@@ -1,7 +1,28 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""This test checks that socket instances (not GreenSockets but underlying sockets) """This test checks that socket instances (not GreenSockets but underlying sockets)
are not leaked by the hub. are not leaked by the hub.
""" """
import sys #import sys
import unittest import unittest
from eventlet.green import socket from eventlet.green import socket
from eventlet.green.thread import start_new_thread from eventlet.green.thread import start_new_thread

View File

@@ -0,0 +1,42 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest
from eventlet import api
if hasattr(api._threadlocal, 'hub'):
from eventlet.green import socket
else:
import socket
class TestSocketErrors(unittest.TestCase):
def test_connection_refused(self):
s = socket.socket()
try:
s.connect(('127.0.0.1', 81))
except socket.error, ex:
code, text = ex.args
assert code == 111, (code, text)
assert 'refused' in text.lower(), (code, text)
if __name__=='__main__':
unittest.main()

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest import unittest
from eventlet.api import call_after, spawn, sleep from eventlet.api import call_after, spawn, sleep

View File

@@ -1,7 +1,27 @@
# Copyright (c) 2008 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from twisted.internet import reactor from twisted.internet import reactor
from greentest import exit_unless_twisted from greentest import exit_unless_twisted
exit_unless_twisted() exit_unless_twisted()
import sys
import unittest import unittest
from twisted.internet.error import DNSLookupError from twisted.internet.error import DNSLookupError
from twisted.internet import defer from twisted.internet import defer

View File

@@ -1,3 +1,24 @@
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from twisted.internet import reactor from twisted.internet import reactor
from greentest import exit_unless_twisted from greentest import exit_unless_twisted
exit_unless_twisted() exit_unless_twisted()
@@ -10,31 +31,36 @@ import eventlet.twistedutil.protocol as pr
from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
from eventlet.api import spawn, sleep, with_timeout, call_after from eventlet.api import spawn, sleep, with_timeout, call_after
from eventlet.coros import event from eventlet.coros import event
from eventlet.green import socket
try:
from eventlet.green import socket
except SyntaxError:
socket = None
DELAY=0.01 DELAY=0.01
def setup_server_socket(self, delay=DELAY, port=0): if socket is not None:
s = socket.socket() def setup_server_socket(self, delay=DELAY, port=0):
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s = socket.socket()
s.bind(('127.0.0.1', port)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = s.getsockname()[1] s.bind(('127.0.0.1', port))
s.listen(5) port = s.getsockname()[1]
s.settimeout(delay*3) s.listen(5)
def serve(): s.settimeout(delay*3)
conn, addr = s.accept() def serve():
conn.settimeout(delay+1) conn, addr = s.accept()
try: conn.settimeout(delay+1)
hello = conn.makefile().readline()[:-2] try:
except socket.timeout: hello = conn.makefile().readline()[:-2]
return except socket.timeout:
conn.sendall('you said %s. ' % hello) return
sleep(delay) conn.sendall('you said %s. ' % hello)
conn.sendall('BYE') sleep(delay)
sleep(delay) conn.sendall('BYE')
#conn.close() sleep(delay)
spawn(serve) #conn.close()
return port spawn(serve)
return port
def setup_server_SpawnFactory(self, delay=DELAY, port=0): def setup_server_SpawnFactory(self, delay=DELAY, port=0):
def handle(conn): def handle(conn):
@@ -66,7 +92,7 @@ class TestCase(unittest.TestCase):
class TestUnbufferedTransport(TestCase): class TestUnbufferedTransport(TestCase):
gtransportClass = pr.UnbufferedTransport gtransportClass = pr.UnbufferedTransport
setup_server = setup_server_socket setup_server = setup_server_SpawnFactory
def test_full_read(self): def test_full_read(self):
self.conn.write('hello\r\n') self.conn.write('hello\r\n')
@@ -82,17 +108,9 @@ class TestUnbufferedTransport_bufsize1(TestUnbufferedTransport):
transportBufferSize = 1 transportBufferSize = 1
setup_server = setup_server_SpawnFactory setup_server = setup_server_SpawnFactory
class TestUnbufferedTransport_SpawnFactory(TestUnbufferedTransport):
setup_server = setup_server_SpawnFactory
class TestUnbufferedTransport_SpawnFactory_bufsize1(TestUnbufferedTransport):
transportBufferSize = 1
setup_server = setup_server_SpawnFactory
class TestGreenTransport(TestUnbufferedTransport): class TestGreenTransport(TestUnbufferedTransport):
gtransportClass = pr.GreenTransport gtransportClass = pr.GreenTransport
setup_server = setup_server_socket setup_server = setup_server_SpawnFactory
def test_read(self): def test_read(self):
self.conn.write('hello\r\n') self.conn.write('hello\r\n')
@@ -138,15 +156,8 @@ class TestGreenTransport(TestUnbufferedTransport):
class TestGreenTransport_bufsize1(TestGreenTransport): class TestGreenTransport_bufsize1(TestGreenTransport):
transportBufferSize = 1 transportBufferSize = 1
class TestGreenTransport_SpawnFactory(TestGreenTransport):
setup_server = setup_server_SpawnFactory
class TestGreenTransport_SpawnFactory_bufsize1(TestGreenTransport):
transportBufferSize = 1
setup_server = setup_server_SpawnFactory
class TestGreenTransportError(TestCase): class TestGreenTransportError(TestCase):
setup_server = setup_server_socket setup_server = setup_server_SpawnFactory
gtransportClass = pr.GreenTransport gtransportClass = pr.GreenTransport
def test_read_error(self): def test_read_error(self):
@@ -181,6 +192,23 @@ class TestGreenTransportError(TestCase):
# self.assertEqual('', self.conn.recv()) # self.assertEqual('', self.conn.recv())
# #
if socket is not None:
class TestUnbufferedTransport_socketserver(TestUnbufferedTransport):
setup_server = setup_server_socket
class TestUnbufferedTransport_socketserver_bufsize1(TestUnbufferedTransport):
transportBufferSize = 1
setup_server = setup_server_socket
class TestGreenTransport_socketserver(TestGreenTransport):
setup_server = setup_server_socket
class TestGreenTransport_socketserver_bufsize1(TestGreenTransport):
transportBufferSize = 1
setup_server = setup_server_socket
class TestTLSError(unittest.TestCase): class TestTLSError(unittest.TestCase):
def test_server_connectionMade_never_called(self): def test_server_connectionMade_never_called(self):

View File

@@ -0,0 +1,133 @@
# Test just the SSL support in the socket module, in a moderately bogus way.
import sys
from greentest import test_support
from eventlet.green import socket
import errno
import unittest
# Optionally test SSL support. This requires the 'network' resource as given
# on the regrtest command line.
skip_expected = not (test_support.is_resource_enabled('network') and
hasattr(socket, "ssl"))
def test_basic():
test_support.requires('network')
from eventlet.green import urllib
if test_support.verbose:
print "test_basic ..."
socket.RAND_status()
try:
socket.RAND_egd(1)
except TypeError:
pass
else:
print "didn't raise TypeError"
socket.RAND_add("this is a random string", 75.0)
try:
f = urllib.urlopen('https://sf.net')
except IOError, exc:
if exc.errno == errno.ETIMEDOUT:
raise test_support.ResourceDenied('HTTPS connection is timing out')
else:
raise
buf = f.read()
f.close()
def test_timeout():
test_support.requires('network')
def error_msg(extra_msg):
print >> sys.stderr, """\
WARNING: an attempt to connect to %r %s, in
test_timeout. That may be legitimate, but is not the outcome we hoped
for. If this message is seen often, test_timeout should be changed to
use a more reliable address.""" % (ADDR, extra_msg)
if test_support.verbose:
print "test_timeout ..."
# A service which issues a welcome banner (without need to write
# anything).
ADDR = "pop.gmail.com", 995
s = socket.socket()
s.settimeout(30.0)
try:
s.connect(ADDR)
except socket.timeout:
error_msg('timed out')
return
except socket.error, exc: # In case connection is refused.
if exc.args[0] == errno.ECONNREFUSED:
error_msg('was refused')
return
else:
raise
ss = socket.ssl(s)
# Read part of return welcome banner twice.
ss.read(1)
ss.read(1)
s.close()
def test_rude_shutdown():
if test_support.verbose:
print "test_rude_shutdown ..."
from eventlet.green import threading
# Some random port to connect to.
PORT = [9934]
listener_ready = threading.Event()
listener_gone = threading.Event()
# `listener` runs in a thread. It opens a socket listening on PORT, and
# sits in an accept() until the main thread connects. Then it rudely
# closes the socket, and sets Event `listener_gone` to let the main thread
# know the socket is gone.
def listener():
s = socket.socket()
PORT[0] = test_support.bind_port(s, '', PORT[0])
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect(('localhost', PORT[0]))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
except socket.sslerror:
pass
else:
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
class Test(unittest.TestCase):
test_basic = lambda self: test_basic()
test_timeout = lambda self: test_timeout()
test_rude_shutdown = lambda self: test_rude_shutdown()
def test_main():
if not hasattr(socket, "ssl"):
raise test_support.TestSkipped("socket module has no ssl support")
test_support.run_unittest(Test)
if __name__ == "__main__":
test_main()

View File

@@ -0,0 +1,11 @@
"""Test that BoundedSemaphore with a very high bound is as good as unbounded one"""
from eventlet import coros
from eventlet.green import thread
def allocate_lock():
return coros.semaphore(1, 9999)
thread.allocate_lock = allocate_lock
thread.LockType = coros.BoundedSemaphore
execfile('test_thread.py')

View File

@@ -1,4 +1,25 @@
#!/usr/bin/python #!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Execute python script with hub installed. """Execute python script with hub installed.
Usage: %prog [--hub HUB] [--reactor REACTOR] program.py Usage: %prog [--hub HUB] [--reactor REACTOR] program.py

View File

@@ -1,4 +1,25 @@
#!/usr/bin/python #!/usr/bin/python
# Copyright (c) 2008-2009 AG Projects
# Author: Denis Bilenko
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
""" """
Run Python script in a child process. Kill it after timeout has elapsed. Run Python script in a child process. Kill it after timeout has elapsed.
If the script was running unittest test cases, the timeouted test cases is If the script was running unittest test cases, the timeouted test cases is
@@ -99,7 +120,11 @@ def execf():
class TestCase(unittest.TestCase): class TestCase(unittest.TestCase):
base = unittest.TestCase base = unittest.TestCase
def run(self, result=None): def run(self, result=None):
name = "%s.%s" % (self.__class__.__name__, self._testMethodName) try:
testMethodName = self._testMethodName
except:
testMethodName = self.__testMethodName
name = "%s.%s" % (self.__class__.__name__, testMethodName)
if name in disabled_tests: if name in disabled_tests:
return return
print name, ' ' print name, ' '
@@ -123,11 +148,12 @@ while True:
os.unlink(CURRENT_TEST_FILENAME) os.unlink(CURRENT_TEST_FILENAME)
except: except:
pass pass
print '===ARGV=%r' % (sys.argv,)
print '===TIMEOUT=%r' % TIMEOUT
sys.stdout.flush()
child = os.fork() child = os.fork()
if child == 0: if child == 0:
print '===PYTHON=%s.%s.%s' % sys.version_info[:3]
print '===ARGV=%s' % ' '.join(sys.argv)
print '===TIMEOUT=%r' % TIMEOUT
sys.stdout.flush()
execf() execf()
break break
else: else: