From 6adaecdc76019f785a5256c36b2575cbf365ecf6 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sun, 21 Feb 2010 17:21:50 -0500 Subject: [PATCH 1/7] Fixed a few small things. --- eventlet/api.py | 4 ++-- eventlet/db_pool.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/eventlet/api.py b/eventlet/api.py index 1779f42..8100e7e 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -49,7 +49,7 @@ def tcp_listener(address, backlog=50): socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. """ - warnings.warn("""eventlet.api.tcp_listener is deprecated. Please use eventlet.green.socket instead. See examples/echoserver.py for an example.""", + warnings.warn("""eventlet.api.tcp_listener is deprecated. Please use eventlet.listen instead.""", DeprecationWarning, stacklevel=2) from eventlet import greenio, util @@ -77,7 +77,7 @@ def connect_tcp(address, localaddr=None): Create a TCP connection to address ``(host, port)`` and return the socket. Optionally, bind to localaddr ``(host, port)`` first. """ - warnings.warn("""eventlet.api.connect_tcp is deprecated. Please use eventlet.green.socket instead. See examples/connect.py for an example.""", + warnings.warn("""eventlet.api.connect_tcp is deprecated. Please use eventlet.connect instead.""", DeprecationWarning, stacklevel=2) from eventlet import greenio, util diff --git a/eventlet/db_pool.py b/eventlet/db_pool.py index 61effe3..07b96ae 100644 --- a/eventlet/db_pool.py +++ b/eventlet/db_pool.py @@ -3,7 +3,6 @@ import sys import time from eventlet.pools import Pool -from eventlet.processes import DeadProcess from eventlet import timeout from eventlet import greenthread From 34171a348bac1b5427618dcfb203dd4c6c48b8d0 Mon Sep 17 00:00:00 2001 From: Chris AtLee Date: Sun, 21 Feb 2010 21:23:32 -0500 Subject: [PATCH 2/7] Improve coverage of debug module --- eventlet/debug.py | 4 +- eventlet/support/greenlets.py | 1 + tests/debug_test.py | 83 ++++++++++++++++++++++++++++++++++- 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/eventlet/debug.py b/eventlet/debug.py index 937e6af..5e44eeb 100644 --- a/eventlet/debug.py +++ b/eventlet/debug.py @@ -4,6 +4,8 @@ debugging Eventlet-powered applications.""" import os import sys import linecache +import string +import inspect __all__ = ['spew', 'unspew', 'format_hub_listeners', 'hub_listener_stacks', 'hub_exceptions', 'tpool_exceptions'] @@ -122,4 +124,4 @@ def tpool_exceptions(state): functions that are executed in it, in addition to raising them like it normally does.""" from eventlet import tpool - tpool.QUIET = not state \ No newline at end of file + tpool.QUIET = not state diff --git a/eventlet/support/greenlets.py b/eventlet/support/greenlets.py index 25bb60d..cb75667 100644 --- a/eventlet/support/greenlets.py +++ b/eventlet/support/greenlets.py @@ -4,6 +4,7 @@ try: GreenletExit = greenlet.GreenletExit greenlet = greenlet.greenlet except ImportError, e: + raise try: from py.magic import greenlet getcurrent = greenlet.getcurrent diff --git a/tests/debug_test.py b/tests/debug_test.py index 4afa349..db5e329 100644 --- a/tests/debug_test.py +++ b/tests/debug_test.py @@ -4,12 +4,90 @@ import eventlet from eventlet import debug from eventlet import api from tests import LimitedTestCase, main +from unittest import TestCase try: from cStringIO import StringIO except ImportError: from StringIO import StringIO +class TestSpew(TestCase): + def setUp(self): + self.orig_trace = sys.settrace + sys.settrace = self._settrace + self.tracer = None + + def tearDown(self): + sys.settrace = self.orig_trace + sys.stdout = sys.__stdout__ + + def _settrace(self, cb): + self.tracer = cb + + def test_spew(self): + debug.spew() + self.failUnless(isinstance(self.tracer, debug.Spew)) + + def test_unspew(self): + debug.spew() + debug.unspew() + self.failUnlessEqual(self.tracer, None) + + def test_line(self): + sys.stdout = StringIO() + s = debug.Spew() + f = sys._getframe() + s(f, "line", None) + lineno = f.f_lineno - 1 # -1 here since we called with frame f in the line above + output = sys.stdout.getvalue() + self.failUnless("debug_test:%i" % lineno in output, "Didn't find line %i in %s" % (lineno, output)) + self.failUnless("f= Date: Sun, 21 Feb 2010 21:39:30 -0500 Subject: [PATCH 3/7] Fix bug that Gene found in greenpipe. --- eventlet/greenio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index bcf03d9..63e42ec 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -416,7 +416,7 @@ class GreenPipe(object): chunk, self.recvbuffer = buf[:found], buf[found:] return chunk checked = max(0, len(buf) - (len(terminator) - 1)) - d = self.fd.read(BUFFER_SIZE) + d = self.read(BUFFER_SIZE) if not d: break buf += d @@ -428,7 +428,7 @@ class GreenPipe(object): chunk, self.recvbuffer = buf[:found], buf[found:] return chunk checked = len(buf) - d = self.fd.read(BUFFER_SIZE) + d = self.read(BUFFER_SIZE) if not d: break buf += d From 13f199da00461a9fa5b77bab4ed91918bcddcc95 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sun, 21 Feb 2010 22:31:23 -0500 Subject: [PATCH 4/7] Improved docs on pools for ericflo, fixes #40. --- eventlet/pools.py | 85 ++++++++++++++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 30 deletions(-) diff --git a/eventlet/pools.py b/eventlet/pools.py index d79f7f3..73d8238 100644 --- a/eventlet/pools.py +++ b/eventlet/pools.py @@ -34,35 +34,41 @@ except ImportError: class Pool(object): """ - Pool is a base class that is meant to be subclassed. When subclassing, - define the :meth:`create` method to implement the desired resource. + Pool is a base class that implements resource limitation and construction. + It is meant to be subclassed. When subclassing, define only + the :meth:`create` method to implement the desired resource:: - When using the pool, if you do a get, you should **always** do a - :meth:`put`. + class MyPool(pools.Pool): + def create(self): + return MyObject() + + If using 2.5 or greater, the :meth:`item` method acts as a context manager; + that's the best way to use it:: + + with mypool.item() as thing: + thing.dostuff() + + If stuck on 2.4, the :meth:`get` and :meth:`put` methods are the preferred + nomenclature. Use a ``finally`` to ensure that nothing is leaked:: - The pattern is:: + thing = self.pool.get() + try: + thing.dostuff() + finally: + self.pool.put(thing) - thing = self.pool.get() - try: - thing.method() - finally: - self.pool.put(thing) - - The maximum size of the pool can be modified at runtime via the - :attr:`max_size` attribute. Adjusting this number does not affect existing - items checked out of the pool, nor on any waiters who are waiting for an - item to free up. Some indeterminate number of :meth:`get`/:meth:`put` - cycles will be necessary before the new maximum size truly matches the - actual operation of the pool. + The maximum size of the pool can be modified at runtime via + the :meth:`resize` method. + + Specifying a non-zero *min-size* argument pre-populates the pool with + *min_size* items. *max-size* sets a hard limit to the size of the pool -- + it cannot contain any more items than *max_size*, and if there are already + *max_size* items 'checked out' of the pool, the pool will cause any + greenthread calling :meth:`get` to cooperatively yield until an item + is :meth:`put` in. """ def __init__(self, min_size=0, max_size=4, order_as_stack=False): - """ Pre-populates the pool with *min_size* items. Sets a hard limit to - the size of the pool -- it cannot contain any more items than - *max_size*, and if there are already *max_size* items 'checked out' of - the pool, the pool will cause any getter to cooperatively yield until an - item is put in. - - *order_as_stack* governs the ordering of the items in the free pool. + """*order_as_stack* governs the ordering of the items in the free pool. If ``False`` (the default), the free items collection (of items that were created and were put back in the pool) acts as a round-robin, giving each item approximately equal utilization. If ``True``, the @@ -80,7 +86,8 @@ class Pool(object): self.free_items.append(self.create()) def get(self): - """Return an item from the pool, when one is available + """Return an item from the pool, when one is available. This may + cause the calling greenthread to block. """ if self.free_items: return self.free_items.popleft() @@ -89,12 +96,13 @@ class Pool(object): self.current_size += 1 return created return self.channel.get() - + if item_impl is not None: item = item_impl def put(self, item): - """Put an item back into the pool, when done + """Put an item back into the pool, when done. This may + cause the putting greenthread to block. """ if self.current_size > self.max_size: self.current_size -= 1 @@ -109,12 +117,19 @@ class Pool(object): self.free_items.append(item) def resize(self, new_size): - """Resize the pool + """Resize the pool to *new_size*. + + Adjusting this number does not affect existing items checked out of + the pool, nor on any greenthreads who are waiting for an item to free + up. Some indeterminate number of :meth:`get`/:meth:`put` + cycles will be necessary before the new maximum size truly matches + the actual operation of the pool. """ self.max_size = new_size def free(self): - """Return the number of free items in the pool. + """Return the number of free items in the pool. This corresponds + to the number of :meth:`get` calls needed to empty the pool. """ return len(self.free_items) + self.max_size - self.current_size @@ -124,7 +139,17 @@ class Pool(object): return max(0, self.channel.getting() - self.channel.putting()) def create(self): - """Generate a new pool item + """Generate a new pool item. This method must be overridden in order + for the pool to function. It accepts no arguments and returns a single + instance of whatever thing the pool is supposed to contain. + + In general, :meth:`create` is called whenever the pool exceeds its + previous high-water mark of concurrently-checked-out-items. In other + words, in a new pool with *min_size* of 0, the very first call + to :meth:`get` will result in a call to :meth:`create`. If the first + caller calls :meth:`put` before some other caller calls :meth:`get`, + then the first item will be returned, and :meth:`create` will not be + called a second time. """ raise NotImplementedError("Implement in subclass") From 330a5157f1c253e7f51edc737081a44612058de6 Mon Sep 17 00:00:00 2001 From: Eugene Oden Date: Sun, 21 Feb 2010 23:11:09 -0500 Subject: [PATCH 5/7] fixes for greenpipe - make readuntil() call self._recv() instead of self.fd.read() - make readline() terminate on '\n' as well as '\r\n' --- eventlet/greenio.py | 8 ++++---- tests/greenio_test.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 63e42ec..b95ab39 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -336,7 +336,7 @@ class GreenSocket(object): class GreenPipe(object): """ GreenPipe is a cooperatively-yielding wrapper around OS pipes. """ - newlines = '\r\n' + newlines = '\n' def __init__(self, fd): set_nonblocking(fd) self.fd = fd @@ -416,7 +416,7 @@ class GreenPipe(object): chunk, self.recvbuffer = buf[:found], buf[found:] return chunk checked = max(0, len(buf) - (len(terminator) - 1)) - d = self.read(BUFFER_SIZE) + d = self._recv(BUFFER_SIZE) if not d: break buf += d @@ -428,7 +428,7 @@ class GreenPipe(object): chunk, self.recvbuffer = buf[:found], buf[found:] return chunk checked = len(buf) - d = self.read(BUFFER_SIZE) + d = self._recv(BUFFER_SIZE) if not d: break buf += d @@ -574,4 +574,4 @@ def serve(sock, handle, concurrency=1000): connections until the existing ones complete. """ pass - \ No newline at end of file + diff --git a/tests/greenio_test.py b/tests/greenio_test.py index f956954..2cbaad4 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -259,7 +259,39 @@ class TestGreenIo(LimitedTestCase): self.assertEquals(result, 'sent via event') server.close() client.close() - + + def test_pipe_read(self): + # ensure that 'readline' works properly on GreenPipes when data is not + # immediately available (fd is nonblocking, was raising EAGAIN) + # also ensures that readline() terminates on '\n' and '\r\n' + r, w = os.pipe() + + r = os.fdopen(r) + w = os.fdopen(w, 'w') + + r = greenio.GreenPipe(r) + w = greenio.GreenPipe(w) + + def writer(): + eventlet.sleep(.1) + + w.write('line\n') + w.flush() + + w.write('line\r\n') + w.flush() + + gt = eventlet.spawn(writer) + + eventlet.sleep(0) + + line = r.readline() + self.assertEquals(line, 'line\n') + + line = r.readline() + self.assertEquals(line, 'line\r\n') + + gt.wait() class TestGreenIoLong(LimitedTestCase): TEST_TIMEOUT=10 # the test here might take a while depending on the OS From f07b25bedc2d75f8e837bd0d38f1d760589c3506 Mon Sep 17 00:00:00 2001 From: Chris AtLee Date: Mon, 22 Feb 2010 14:02:28 -0500 Subject: [PATCH 6/7] Bug 37: Implement queue resizing --- eventlet/queue.py | 9 +++++++++ tests/queue_test.py | 27 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/eventlet/queue.py b/eventlet/queue.py index 3963381..1bf3aef 100644 --- a/eventlet/queue.py +++ b/eventlet/queue.py @@ -181,6 +181,15 @@ class LightQueue(object): def qsize(self): """Return the size of the queue.""" return len(self.queue) + + def resize(self, size): + """Resizes the queue's maximum size. + + If the size is increased, and there are putters waiting, they may be woken up.""" + if size > self.maxsize: + # Maybe wake some stuff up + self._schedule_unlock() + self.maxsize = size def putting(self): """Returns the number of greenthreads that are blocked waiting to put diff --git a/tests/queue_test.py b/tests/queue_test.py index 99a713b..496a8cc 100644 --- a/tests/queue_test.py +++ b/tests/queue_test.py @@ -68,6 +68,33 @@ class TestQueue(LimitedTestCase): self.assertEquals(evt.wait(),'done') gt.wait() + def test_resize_up(self): + q = eventlet.Queue(0) + def sender(evt, q): + q.put('hi') + evt.send('done') + + evt = event.Event() + gt = eventlet.spawn(sender, evt, q) + eventlet.sleep(0) + self.assert_(not evt.ready()) + q.resize(1) + eventlet.sleep(0) + self.assert_(evt.ready()) + gt.wait() + + def test_resize_down(self): + size = 5 + q = eventlet.Queue(5) + + for i in range(5): + q.put(i) + + self.assertEquals(list(q.queue), range(5)) + q.resize(1) + eventlet.sleep(0) + self.assertEquals(list(q.queue), range(5)) + def test_multiple_waiters(self): # tests that multiple waiters get their results back q = eventlet.Queue() From cd20f974483357195c907b48e9a0b0ce82f23820 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 22 Feb 2010 16:52:02 -0500 Subject: [PATCH 7/7] Skipping saranwrap tests on pyevent hub....saranwrap's going to go away soon anyhow. --- tests/saranwrap_test.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/tests/saranwrap_test.py b/tests/saranwrap_test.py index 76ca96e..07bc2da 100644 --- a/tests/saranwrap_test.py +++ b/tests/saranwrap_test.py @@ -6,7 +6,7 @@ import eventlet import sys import tempfile import time -from tests import LimitedTestCase, main, skip_on_windows +from tests import LimitedTestCase, main, skip_on_windows, skip_with_pyevent import re import StringIO @@ -40,6 +40,7 @@ class TestSaranwrap(LimitedTestCase): self.assertEqual(0, prox.foo) @skip_on_windows + @skip_with_pyevent def test_wrap_tuple(self): my_tuple = (1, 2) prox = saranwrap.wrap(my_tuple) @@ -48,6 +49,7 @@ class TestSaranwrap(LimitedTestCase): self.assertEqual(len(my_tuple), 2) @skip_on_windows + @skip_with_pyevent def test_wrap_string(self): my_object = "whatever" prox = saranwrap.wrap(my_object) @@ -56,6 +58,7 @@ class TestSaranwrap(LimitedTestCase): self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b'])) @skip_on_windows + @skip_with_pyevent def test_wrap_uniterable(self): # here we're treating the exception as just a normal class prox = saranwrap.wrap(FloatingPointError()) @@ -68,6 +71,7 @@ class TestSaranwrap(LimitedTestCase): self.assertRaises(TypeError, key) @skip_on_windows + @skip_with_pyevent def test_wrap_dict(self): my_object = {'a':1} prox = saranwrap.wrap(my_object) @@ -78,6 +82,7 @@ class TestSaranwrap(LimitedTestCase): self.assertEqual('saran:' + `my_object`, `prox`) @skip_on_windows + @skip_with_pyevent def test_wrap_module_class(self): prox = saranwrap.wrap(re) self.assertEqual(saranwrap.Proxy, type(prox)) @@ -86,6 +91,7 @@ class TestSaranwrap(LimitedTestCase): self.assert_(repr(prox.compile)) @skip_on_windows + @skip_with_pyevent def test_wrap_eq(self): prox = saranwrap.wrap(re) exp1 = prox.compile('.') @@ -95,6 +101,7 @@ class TestSaranwrap(LimitedTestCase): self.assert_(exp1 != exp3) @skip_on_windows + @skip_with_pyevent def test_wrap_nonzero(self): prox = saranwrap.wrap(re) exp1 = prox.compile('.') @@ -103,6 +110,7 @@ class TestSaranwrap(LimitedTestCase): self.assert_(bool(prox2)) @skip_on_windows + @skip_with_pyevent def test_multiple_wraps(self): prox1 = saranwrap.wrap(re) prox2 = saranwrap.wrap(re) @@ -112,6 +120,7 @@ class TestSaranwrap(LimitedTestCase): x3 = prox2.compile('.') @skip_on_windows + @skip_with_pyevent def test_dict_passthru(self): prox = saranwrap.wrap(StringIO) x = prox.StringIO('a') @@ -120,22 +129,26 @@ class TestSaranwrap(LimitedTestCase): self.assertEqual(type(saranwrap.wrap(StringIO).StringIO('a').__dict__), saranwrap.ObjectProxy) @skip_on_windows + @skip_with_pyevent def test_is_value(self): server = saranwrap.Server(None, None, None) self.assert_(server.is_value(None)) @skip_on_windows + @skip_with_pyevent def test_wrap_getitem(self): prox = saranwrap.wrap([0,1,2]) self.assertEqual(prox[0], 0) @skip_on_windows + @skip_with_pyevent def test_wrap_setitem(self): prox = saranwrap.wrap([0,1,2]) prox[1] = 2 self.assertEqual(prox[1], 2) @skip_on_windows + @skip_with_pyevent def test_raising_exceptions(self): prox = saranwrap.wrap(re) def nofunc(): @@ -143,6 +156,7 @@ class TestSaranwrap(LimitedTestCase): self.assertRaises(AttributeError, nofunc) @skip_on_windows + @skip_with_pyevent def test_unpicklable_server_exception(self): prox = saranwrap.wrap(saranwrap) def unpickle(): @@ -154,6 +168,7 @@ class TestSaranwrap(LimitedTestCase): #self.assert_server_exists(prox) @skip_on_windows + @skip_with_pyevent def test_pickleable_server_exception(self): prox = saranwrap.wrap(saranwrap) def fperror(): @@ -163,12 +178,14 @@ class TestSaranwrap(LimitedTestCase): self.assert_server_exists(prox) @skip_on_windows + @skip_with_pyevent def test_print_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.print_string('hello') self.assert_server_exists(prox) @skip_on_windows + @skip_with_pyevent def test_stderr_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.err_string('goodbye') @@ -178,6 +195,7 @@ class TestSaranwrap(LimitedTestCase): self.assert_(a < b, "%s is not less than %s" % (a, b)) @skip_on_windows + @skip_with_pyevent def test_status(self): prox = saranwrap.wrap(time) a = prox.gmtime(0) @@ -197,6 +215,7 @@ class TestSaranwrap(LimitedTestCase): self.assert_(status['pid'] != saranwrap.status(prox2)['pid']) @skip_on_windows + @skip_with_pyevent def test_del(self): prox = saranwrap.wrap(time) delme = prox.gmtime(0) @@ -211,12 +230,14 @@ class TestSaranwrap(LimitedTestCase): self.assertLessThan(status_after['object_count'], status_before['object_count']) @skip_on_windows + @skip_with_pyevent def test_contains(self): prox = saranwrap.wrap({'a':'b'}) self.assert_('a' in prox) self.assert_('x' not in prox) @skip_on_windows + @skip_with_pyevent def test_variable_and_keyword_arguments_with_function_calls(self): import optparse prox = saranwrap.wrap(optparse) @@ -226,6 +247,7 @@ class TestSaranwrap(LimitedTestCase): self.assertEqual(opts.n, 'foo') @skip_on_windows + @skip_with_pyevent def test_original_proxy_going_out_of_scope(self): def make_re(): prox = saranwrap.wrap(re) @@ -249,6 +271,7 @@ class TestSaranwrap(LimitedTestCase): pass @skip_on_windows + @skip_with_pyevent def test_not_inheriting_pythonpath(self): # construct a fake module in the temp directory temp_dir = tempfile.mkdtemp("saranwrap_test") @@ -279,6 +302,7 @@ sys_path = sys.path""") sys.path.remove(temp_dir) @skip_on_windows + @skip_with_pyevent def test_contention(self): from tests import saranwrap_test prox = saranwrap.wrap(saranwrap_test) @@ -290,6 +314,7 @@ sys_path = sys.path""") pool.waitall() @skip_on_windows + @skip_with_pyevent def test_copy(self): import copy compound_object = {'a':[1,2,3]} @@ -304,6 +329,7 @@ sys_path = sys.path""") make_assertions(copy.deepcopy(prox)) @skip_on_windows + @skip_with_pyevent def test_list_of_functions(self): return # this test is known to fail, we can implement it sometime in the future if we wish from tests import saranwrap_test @@ -311,6 +337,7 @@ sys_path = sys.path""") self.assertEquals(list_maker(), prox[0]()) @skip_on_windows + @skip_with_pyevent def test_under_the_hood_coroutines(self): # so, we want to write a class which uses a coroutine to call # a function. Then we want to saranwrap that class, have @@ -330,6 +357,7 @@ sys_path = sys.path""") 'Coroutine in saranwrapped object did not run') @skip_on_windows + @skip_with_pyevent def test_child_process_death(self): prox = saranwrap.wrap({}) pid = saranwrap.getpid(prox) @@ -339,16 +367,19 @@ sys_path = sys.path""") self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist @skip_on_windows + @skip_with_pyevent def test_detection_of_server_crash(self): # make the server crash here pass @skip_on_windows + @skip_with_pyevent def test_equality_with_local_object(self): # we'll implement this if there's a use case for it pass @skip_on_windows + @skip_with_pyevent def test_non_blocking(self): # here we test whether it's nonblocking pass