Merge
This commit is contained in:
4
LICENSE
4
LICENSE
@@ -1,8 +1,8 @@
|
||||
Unless otherwise noted, the files in Eventlet are under the following MIT license:
|
||||
|
||||
Copyright (c) 2005-2006, Bob Ippolito
|
||||
Copyright (c) 2007-2009, Linden Research, Inc.
|
||||
Copyright (c) 2008-2009, Individual Contributors (see AUTHORS)
|
||||
Copyright (c) 2007-2010, Linden Research, Inc.
|
||||
Copyright (c) 2008-2010, Eventlet Contributors (see AUTHORS)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
@@ -50,4 +50,13 @@ Though Eventlet has many modules, much of the most-used stuff is accessible simp
|
||||
Timeout objects are context managers, and so can be used in with statements.
|
||||
See :class:`Timeout <eventlet.timeout.Timeout>` for more details.
|
||||
|
||||
.. function:: eventlet.import_patched(modulename, *additional_modules, **kw_additional_modules)
|
||||
|
||||
Imports a module in a way that ensures that the module uses "green" versions of the standard library modules, so that everything works nonblockingly. The only required argument is the name of the module to be imported. For more information see :ref:`import-green`.
|
||||
|
||||
.. function:: eventlet.monkey_patch(all=True, os=False, select=False, socket=False, thread=False, time=False)
|
||||
|
||||
Globally patches certain system modules to be greenthread-friendly. The keyword arguments afford some control over which modules are patched. If *all* is True, then all modules are patched regardless of the other arguments. If it's False, then the rest of the keyword arguments control patching of specific subsections of the standard library. Most patch the single module of the same name (os, time, select). The exceptions are socket, which also patches the ssl module if present; and thread, which patches thread, threading, and Queue. It's safe to call monkey_patch multiple times. For more information see :ref:`monkey-patch`.
|
||||
|
||||
|
||||
These are the basic primitives of Eventlet; there are a lot more out there in the other Eventlet modules; check out the :doc:`modules`.
|
||||
|
@@ -43,7 +43,7 @@ master_doc = 'index'
|
||||
|
||||
# General information about the project.
|
||||
project = u'Eventlet'
|
||||
copyright = u'2009, Eventlet Contributors'
|
||||
copyright = u'2005-2010, Eventlet Contributors'
|
||||
|
||||
# The version info for the project you're documenting, acts as replacement for
|
||||
# |version| and |release|, also used in various other places throughout the
|
||||
|
@@ -76,8 +76,7 @@ One common use case that Linden Lab runs into all the time is a "dispatch" desig
|
||||
Here's a somewhat contrived example: a server that receives POSTs from clients that contain a list of urls of RSS feeds. The server fetches all the feeds concurrently and responds with a list of their titles to the client. It's easy to imagine it doing something more complex than this, and this could be easily modified to become a Reader-style application::
|
||||
|
||||
import eventlet
|
||||
from eventlet import patcher
|
||||
feedparser = patcher.import_patched('feedparser')
|
||||
feedparser = eventlet.import_patched('feedparser')
|
||||
|
||||
pool = eventlet.GreenPool()
|
||||
|
||||
|
@@ -1,6 +1,11 @@
|
||||
:mod:`semaphore` -- Semaphore classes
|
||||
==================================================
|
||||
|
||||
.. automodule:: eventlet.semaphore
|
||||
.. autoclass:: eventlet.semaphore.Semaphore
|
||||
:members:
|
||||
:undoc-members:
|
||||
|
||||
.. autoclass:: eventlet.semaphore.BoundedSemaphore
|
||||
:members:
|
||||
|
||||
.. autoclass:: eventlet.semaphore.CappedSemaphore
|
||||
:members:
|
@@ -16,14 +16,14 @@ The first way of greening an application is to import networking-related librari
|
||||
from eventlet.green import threading
|
||||
from eventlet.green import asyncore
|
||||
|
||||
This works best if every library can be imported green in this manner. If ``eventlet.green`` lacks a module (for example, non-python-standard modules), then the :mod:`eventlet.patcher` module can come to the rescue. It provides a function, :func:`eventlet.patcher.import_patched`, that greens any module on import.
|
||||
This works best if every library can be imported green in this manner. If ``eventlet.green`` lacks a module (for example, non-python-standard modules), then :func:`~eventlet.patcher.import_patched` function can come to the rescue. It is a replacement for the builtin import statement that greens any module on import.
|
||||
|
||||
.. function:: eventlet.patcher.import_patched(module_name, *additional_modules, **kw_additional_modules)
|
||||
|
||||
Imports a module in a greened manner, so that the module's use of networking libraries like socket will use Eventlet's green versions instead. The only required argument is the name of the module to be imported::
|
||||
|
||||
from eventlet import patcher
|
||||
httplib2 = patcher.import_patched('httplib2')
|
||||
import eventlet
|
||||
httplib2 = eventlet.import_patched('httplib2')
|
||||
|
||||
Under the hood, it works by temporarily swapping out the "normal" versions of the libraries in sys.modules for an eventlet.green equivalent. When the import of the to-be-patched module completes, the state of sys.modules is restored. Therefore, if the patched module contains the statement 'import socket', import_patched will have it reference eventlet.green.socket. One weakness of this approach is that it doesn't work for late binding (i.e. imports that happen during runtime). Late binding of imports is fortunately rarely done (it's slow and against `PEP-8 <http://www.python.org/dev/peps/pep-0008/>`_), so in most cases import_patched will work just fine.
|
||||
|
||||
@@ -31,12 +31,13 @@ This works best if every library can be imported green in this manner. If ``eve
|
||||
|
||||
from eventlet.green import socket
|
||||
from eventlet.green import SocketServer
|
||||
BaseHTTPServer = patcher.import_patched('BaseHTTPServer',
|
||||
BaseHTTPServer = eventlet.import_patched('BaseHTTPServer',
|
||||
('socket', socket),
|
||||
('SocketServer', SocketServer))
|
||||
BaseHTTPServer = patcher.import_patched('BaseHTTPServer',
|
||||
BaseHTTPServer = eventlet.import_patched('BaseHTTPServer',
|
||||
socket=socket, SocketServer=SocketServer)
|
||||
|
||||
.. _monkey-patch:
|
||||
|
||||
Monkeypatching the Standard Library
|
||||
----------------------------------------
|
||||
@@ -50,7 +51,7 @@ library. This has the disadvantage of appearing quite magical, but the advantag
|
||||
|
||||
Here's an example of using monkey_patch to patch only a few modules::
|
||||
|
||||
from eventlet import patcher
|
||||
patcher.monkey_patch(all=False, socket=True, select=True)
|
||||
import eventlet
|
||||
eventlet.monkey_patch(all=False, socket=True, select=True)
|
||||
|
||||
It is important to call :func:`eventlet.patcher.monkey_patch` as early in the lifetime of the application as possible. Try to do it as one of the first lines in the main module. The reason for this is that sometimes there is a class that inherits from a class that needs to be greened -- e.g. a class that inherits from socket.socket -- and inheritance is done at import time, so therefore the monkeypatching should happen before the derived class is defined. It's safe to call monkey_patch multiple times.
|
||||
It is important to call :func:`~eventlet.patcher.monkey_patch` as early in the lifetime of the application as possible. Try to do it as one of the first lines in the main module. The reason for this is that sometimes there is a class that inherits from a class that needs to be greened -- e.g. a class that inherits from socket.socket -- and inheritance is done at import time, so therefore the monkeypatching should happen before the derived class is defined. It's safe to call monkey_patch multiple times.
|
@@ -17,6 +17,9 @@ __all__ = [
|
||||
'ssl_listener', 'tcp_listener', 'trampoline',
|
||||
'unspew', 'use_hub', 'with_timeout', 'timeout']
|
||||
|
||||
warnings.warn("eventlet.api is deprecated! Nearly everything in it has moved "
|
||||
"to the eventlet module.", DeprecationWarning, stacklevel=2)
|
||||
|
||||
def get_hub(*a, **kw):
|
||||
warnings.warn("eventlet.api.get_hub has moved to eventlet.hubs.get_hub",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
|
@@ -1,8 +1,8 @@
|
||||
from eventlet import api
|
||||
from eventlet import greenthread
|
||||
|
||||
def get_ident():
|
||||
""" Returns ``id()`` of current greenlet. Useful for debugging."""
|
||||
return id(api.getcurrent())
|
||||
return id(greenthread.getcurrent())
|
||||
|
||||
# TODO: The base threadlocal class wants to call __init__ on itself for every new thread that associates with it; our corolocal doesn't do this, but should for 100% compatibility. The implementation in _threading_local.py is so janky....
|
||||
class local(object):
|
||||
|
@@ -1,8 +1,6 @@
|
||||
|
||||
|
||||
__select = __import__('select')
|
||||
error = __select.error
|
||||
from eventlet.api import getcurrent
|
||||
from eventlet.greenthread import getcurrent
|
||||
from eventlet.hubs import get_hub
|
||||
|
||||
__patched__ = ['select']
|
||||
|
@@ -2,4 +2,4 @@ __time = __import__('time')
|
||||
for var in dir(__time):
|
||||
exec "%s = __time.%s" % (var, var)
|
||||
__patched__ = ['sleep']
|
||||
from eventlet.api import sleep
|
||||
from eventlet.greenthread import sleep
|
||||
|
@@ -7,7 +7,7 @@ from eventlet.hubs import timer
|
||||
from eventlet.support import greenlets as greenlet
|
||||
import warnings
|
||||
|
||||
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'call_after_global', 'call_after_local', 'GreenThread']
|
||||
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'spawn_after', 'spawn_after_local', 'GreenThread']
|
||||
|
||||
getcurrent = greenlet.getcurrent
|
||||
|
||||
@@ -16,7 +16,7 @@ def sleep(seconds=0):
|
||||
elapsed.
|
||||
|
||||
*seconds* may be specified as an integer, or a float if fractional seconds
|
||||
are desired. Calling :func:`~eventlet.api.sleep` with *seconds* of 0 is the
|
||||
are desired. Calling :func:`~greenthread.sleep` with *seconds* of 0 is the
|
||||
canonical way of expressing a cooperative yield. For example, if one is
|
||||
looping over a large list performing an expensive calculation without
|
||||
calling any socket methods, it's a good idea to call ``sleep(0)``
|
||||
@@ -73,9 +73,9 @@ def spawn_after(seconds, func, *args, **kwargs):
|
||||
|
||||
To cancel the spawn and prevent *func* from being called,
|
||||
call :meth:`GreenThread.cancel` on the return value of :func:`spawn_after`.
|
||||
This will not abort the function if it's already started running. If
|
||||
terminating *func* regardless of whether it's started or not is the desired
|
||||
behavior, call :meth:`GreenThread.kill`.
|
||||
This will not abort the function if it's already started running, which is
|
||||
generally the desired behavior. If terminating *func* regardless of whether
|
||||
it's started or not is the desired behavior, call :meth:`GreenThread.kill`.
|
||||
"""
|
||||
hub = hubs.get_hub()
|
||||
g = GreenThread(hub.greenlet)
|
||||
@@ -107,14 +107,6 @@ def spawn_after_local(seconds, func, *args, **kwargs):
|
||||
|
||||
|
||||
def call_after_global(seconds, func, *args, **kwargs):
|
||||
"""Schedule *function* to be called after *seconds* have elapsed.
|
||||
The function will be scheduled even if the current greenlet has exited.
|
||||
|
||||
*seconds* may be specified as an integer, or a float if fractional seconds
|
||||
are desired. The *function* will be called with the given *args* and
|
||||
keyword arguments *kwargs*, and will be executed within its own greenthread.
|
||||
|
||||
Its return value is discarded."""
|
||||
warnings.warn("call_after_global is renamed to spawn_after, which"
|
||||
"has the same signature and semantics (plus a bit extra). Please do a"
|
||||
" quick search-and-replace on your codebase, thanks!",
|
||||
@@ -123,15 +115,6 @@ def call_after_global(seconds, func, *args, **kwargs):
|
||||
|
||||
|
||||
def call_after_local(seconds, function, *args, **kwargs):
|
||||
"""Schedule *function* to be called after *seconds* have elapsed.
|
||||
The function will NOT be called if the current greenthread has exited.
|
||||
|
||||
*seconds* may be specified as an integer, or a float if fractional seconds
|
||||
are desired. The *function* will be called with the given *args* and
|
||||
keyword arguments *kwargs*, and will be executed within its own greenthread.
|
||||
|
||||
Its return value is discarded.
|
||||
"""
|
||||
warnings.warn("call_after_local is renamed to spawn_after_local, which"
|
||||
"has the same signature and semantics (plus a bit extra).",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
@@ -145,30 +128,6 @@ call_after = call_after_local
|
||||
|
||||
|
||||
def exc_after(seconds, *throw_args):
|
||||
"""Schedule an exception to be raised into the current coroutine
|
||||
after *seconds* have elapsed.
|
||||
|
||||
This only works if the current coroutine is yielding, and is generally
|
||||
used to set timeouts after which a network operation or series of
|
||||
operations will be canceled.
|
||||
|
||||
Returns a :class:`~eventlet.timer.Timer` object with a
|
||||
:meth:`~eventlet.timer.Timer.cancel` method which should be used to
|
||||
prevent the exception if the operation completes successfully.
|
||||
|
||||
See also :func:`~eventlet.api.with_timeout` that encapsulates the idiom below.
|
||||
|
||||
Example::
|
||||
|
||||
def read_with_timeout():
|
||||
timer = api.exc_after(30, RuntimeError())
|
||||
try:
|
||||
httpc.get('http://www.google.com/')
|
||||
except RuntimeError:
|
||||
print "Timed out!"
|
||||
else:
|
||||
timer.cancel()
|
||||
"""
|
||||
warnings.warn("Instead of exc_after, which is deprecated, use "
|
||||
"Timeout(seconds, exception)",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
@@ -270,9 +229,14 @@ def cancel(g, *throw_args):
|
||||
|
||||
def kill(g, *throw_args):
|
||||
"""Terminates the target greenthread by raising an exception into it.
|
||||
Whatever that greenthread might be doing; be it waiting for I/O or another
|
||||
primitive, it sees an exception right away.
|
||||
|
||||
By default, this exception is GreenletExit, but a specific exception
|
||||
may be specified. *throw_args* should be the same as the arguments to
|
||||
raise; either an exception instance or an exc_info tuple.
|
||||
|
||||
Calling :func:`kill` causes the calling greenthread to cooperatively yield.
|
||||
"""
|
||||
if g.dead:
|
||||
return
|
||||
|
@@ -1,6 +1,5 @@
|
||||
import collections
|
||||
|
||||
from eventlet import api
|
||||
from eventlet import queue
|
||||
|
||||
__all__ = ['Pool', 'TokenPool']
|
||||
|
@@ -4,7 +4,12 @@ import struct
|
||||
import sys
|
||||
|
||||
from eventlet.processes import Process, DeadProcess
|
||||
from eventlet import api, pools
|
||||
from eventlet import pools
|
||||
|
||||
import warnings
|
||||
warnings.warn("eventlet.saranwrap is deprecated due to underuse. If you love "
|
||||
"it, let us know by emailing eventletdev@lists.secondlife.com",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
|
||||
# debugging hooks
|
||||
_g_debug_mode = False
|
||||
@@ -31,9 +36,9 @@ def wrap(obj, dead_callback = None):
|
||||
return wrap_module(obj.__name__, dead_callback)
|
||||
pythonpath_sync()
|
||||
if _g_debug_mode:
|
||||
p = Process(sys.executable, [__file__, '--child', '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback)
|
||||
p = Process(sys.executable, ["-W", "ignore", __file__, '--child', '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback)
|
||||
else:
|
||||
p = Process(sys.executable, [__file__, '--child'], dead_callback)
|
||||
p = Process(sys.executable, ["-W", "ignore", __file__, '--child'], dead_callback)
|
||||
prox = Proxy(ChildProcess(p, p))
|
||||
prox.obj = obj
|
||||
return prox.obj
|
||||
@@ -48,9 +53,9 @@ def wrap_module(fqname, dead_callback = None):
|
||||
pythonpath_sync()
|
||||
global _g_debug_mode
|
||||
if _g_debug_mode:
|
||||
p = Process(sys.executable, [__file__, '--module', fqname, '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback)
|
||||
p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname, '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback)
|
||||
else:
|
||||
p = Process(sys.executable, [__file__, '--module', fqname,], dead_callback)
|
||||
p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname,], dead_callback)
|
||||
prox = Proxy(ChildProcess(p,p))
|
||||
return prox
|
||||
|
||||
@@ -593,6 +598,41 @@ def print_string(str):
|
||||
def err_string(str):
|
||||
print >>sys.stderr, str
|
||||
|
||||
def named(name):
|
||||
"""Return an object given its name.
|
||||
|
||||
The name uses a module-like syntax, eg::
|
||||
|
||||
os.path.join
|
||||
|
||||
or::
|
||||
|
||||
mulib.mu.Resource
|
||||
"""
|
||||
toimport = name
|
||||
obj = None
|
||||
import_err_strings = []
|
||||
while toimport:
|
||||
try:
|
||||
obj = __import__(toimport)
|
||||
break
|
||||
except ImportError, err:
|
||||
# print 'Import error on %s: %s' % (toimport, err) # debugging spam
|
||||
import_err_strings.append(err.__str__())
|
||||
toimport = '.'.join(toimport.split('.')[:-1])
|
||||
if obj is None:
|
||||
raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings))
|
||||
for seg in name.split('.')[1:]:
|
||||
try:
|
||||
obj = getattr(obj, seg)
|
||||
except AttributeError:
|
||||
dirobj = dir(obj)
|
||||
dirobj.sort()
|
||||
raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % (
|
||||
seg, obj, dirobj, name, import_err_strings))
|
||||
return obj
|
||||
|
||||
|
||||
def main():
|
||||
import optparse
|
||||
parser = optparse.OptionParser(
|
||||
@@ -617,7 +657,7 @@ def main():
|
||||
if options.module:
|
||||
def get_module():
|
||||
if base_obj[0] is None:
|
||||
base_obj[0] = api.named(options.module)
|
||||
base_obj[0] = named(options.module)
|
||||
return base_obj[0]
|
||||
server = Server(tpool.Proxy(sys.stdin),
|
||||
tpool.Proxy(sys.stdout),
|
||||
|
@@ -5,12 +5,25 @@ class Semaphore(object):
|
||||
"""An unbounded semaphore.
|
||||
Optionally initialize with a resource *count*, then :meth:`acquire` and
|
||||
:meth:`release` resources as needed. Attempting to :meth:`acquire` when
|
||||
*count* is zero suspends the calling coroutine until *count* becomes
|
||||
*count* is zero suspends the calling greenthread until *count* becomes
|
||||
nonzero again.
|
||||
|
||||
This is API-compatible with :class:`threading.Semaphore`.
|
||||
|
||||
It is a context manager, and thus can be used in a with block::
|
||||
|
||||
sem = Semaphore(2)
|
||||
with sem:
|
||||
do_some_stuff()
|
||||
|
||||
If not specified, *value* defaults to 1.
|
||||
"""
|
||||
|
||||
def __init__(self, count=0):
|
||||
self.counter = count
|
||||
def __init__(self, value=1):
|
||||
self.counter = value
|
||||
if value < 0:
|
||||
raise ValueError("Semaphore must be initialized with a positive "
|
||||
"number, got %s" % value)
|
||||
self._waiters = set()
|
||||
|
||||
def __repr__(self):
|
||||
@@ -22,13 +35,31 @@ class Semaphore(object):
|
||||
return '<%s c=%s _w[%s]>' % params
|
||||
|
||||
def locked(self):
|
||||
""" Returns true if a call to acquire would block."""
|
||||
return self.counter <= 0
|
||||
|
||||
def bounded(self):
|
||||
# for consistency with BoundedSemaphore
|
||||
""" Returns False; for consistency with :class:`~eventlet.semaphore.CappedSemaphore`."""
|
||||
return False
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
"""Acquire a semaphore.
|
||||
|
||||
When invoked without arguments: if the internal counter is larger than
|
||||
zero on entry, decrement it by one and return immediately. If it is zero
|
||||
on entry, block, waiting until some other thread has called release() to
|
||||
make it larger than zero. This is done with proper interlocking so that
|
||||
if multiple acquire() calls are blocked, release() will wake exactly one
|
||||
of them up. The implementation may pick one at random, so the order in
|
||||
which blocked threads are awakened should not be relied on. There is no
|
||||
return value in this case.
|
||||
|
||||
When invoked with blocking set to true, do the same thing as when called
|
||||
without arguments, and return true.
|
||||
|
||||
When invoked with blocking set to false, do not block. If a call without
|
||||
an argument would block, return false immediately; otherwise, do the
|
||||
same thing as when called without arguments, and return true."""
|
||||
if not blocking and self.locked():
|
||||
return False
|
||||
if self.counter <= 0:
|
||||
@@ -45,7 +76,12 @@ class Semaphore(object):
|
||||
self.acquire()
|
||||
|
||||
def release(self, blocking=True):
|
||||
# `blocking' parameter is for consistency with BoundedSemaphore and is ignored
|
||||
"""Release a semaphore, incrementing the internal counter by one. When
|
||||
it was zero on entry and another thread is waiting for it to become
|
||||
larger than zero again, wake up that thread.
|
||||
|
||||
The *blocking* argument is for consistency with CappedSemaphore and is
|
||||
ignored"""
|
||||
self.counter += 1
|
||||
if self._waiters:
|
||||
hubs.get_hub().schedule_call_global(0, self._do_acquire)
|
||||
@@ -61,22 +97,69 @@ class Semaphore(object):
|
||||
|
||||
@property
|
||||
def balance(self):
|
||||
"""An integer value that represents how many new calls to
|
||||
:meth:`acquire` or :meth:`release` would be needed to get the counter to
|
||||
0. If it is positive, then its value is the number of acquires that can
|
||||
happen before the next acquire would block. If it is negative, it is
|
||||
the negative of the number of releases that would be required in order
|
||||
to make the counter 0 again (one more release would push the counter to
|
||||
1 and unblock acquirers). It takes into account how many greenthreads
|
||||
are currently blocking in :meth:`acquire`.
|
||||
"""
|
||||
# positive means there are free items
|
||||
# zero means there are no free items but nobody has requested one
|
||||
# negative means there are requests for items, but no items
|
||||
return self.counter - len(self._waiters)
|
||||
|
||||
|
||||
class BoundedSemaphore(object):
|
||||
"""A bounded semaphore.
|
||||
class BoundedSemaphore(Semaphore):
|
||||
"""A bounded semaphore checks to make sure its current value doesn't exceed
|
||||
its initial value. If it does, ValueError is raised. In most situations
|
||||
semaphores are used to guard resources with limited capacity. If the
|
||||
semaphore is released too many times it's a sign of a bug. If not given,
|
||||
*value* defaults to 1."""
|
||||
def __init__(self, value=1):
|
||||
super(BoundedSemaphore, self).__init__(value)
|
||||
self.original_counter = value
|
||||
|
||||
def release(self, blocking=True):
|
||||
"""Release a semaphore, incrementing the internal counter by one. If
|
||||
the counter would exceed the initial value, raises ValueError. When
|
||||
it was zero on entry and another thread is waiting for it to become
|
||||
larger than zero again, wake up that thread.
|
||||
|
||||
The *blocking* argument is for consistency with :class:`CappedSemaphore`
|
||||
and is ignored"""
|
||||
if self.counter >= self.original_counter:
|
||||
raise ValueError, "Semaphore released too many times"
|
||||
return super(BoundedSemaphore, self).release(blocking)
|
||||
|
||||
class CappedSemaphore(object):
|
||||
"""A blockingly bounded semaphore.
|
||||
|
||||
Optionally initialize with a resource *count*, then :meth:`acquire` and
|
||||
:meth:`release` resources as needed. Attempting to :meth:`acquire` when
|
||||
*count* is zero suspends the calling coroutine until count becomes nonzero
|
||||
*count* is zero suspends the calling greenthread until count becomes nonzero
|
||||
again. Attempting to :meth:`release` after *count* has reached *limit*
|
||||
suspends the calling coroutine until *count* becomes less than *limit*
|
||||
suspends the calling greenthread until *count* becomes less than *limit*
|
||||
again.
|
||||
|
||||
This has the same API as :class:`threading.Semaphore`, though its
|
||||
semantics and behavior differ subtly due to the upper limit on calls
|
||||
to :meth:`release`. It is **not** compatible with
|
||||
:class:`threading.BoundedSemaphore` because it blocks when reaching *limit*
|
||||
instead of raising a ValueError.
|
||||
|
||||
It is a context manager, and thus can be used in a with block::
|
||||
|
||||
sem = CappedSemaphore(2)
|
||||
with sem:
|
||||
do_some_stuff()
|
||||
"""
|
||||
def __init__(self, count, limit):
|
||||
if count < 0:
|
||||
raise ValueError("CappedSemaphore must be initialized with a "
|
||||
"positive number, got %s" % count)
|
||||
if count > limit:
|
||||
# accidentally, this also catches the case when limit is None
|
||||
raise ValueError("'count' cannot be more than 'limit'")
|
||||
@@ -92,12 +175,31 @@ class BoundedSemaphore(object):
|
||||
return '<%s b=%s l=%s u=%s>' % params
|
||||
|
||||
def locked(self):
|
||||
"""Returns true if a call to acquire would block."""
|
||||
return self.lower_bound.locked()
|
||||
|
||||
def bounded(self):
|
||||
def bounded(self):
|
||||
"""Returns true if a call to release would block."""
|
||||
return self.upper_bound.locked()
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
"""Acquire a semaphore.
|
||||
|
||||
When invoked without arguments: if the internal counter is larger than
|
||||
zero on entry, decrement it by one and return immediately. If it is zero
|
||||
on entry, block, waiting until some other thread has called release() to
|
||||
make it larger than zero. This is done with proper interlocking so that
|
||||
if multiple acquire() calls are blocked, release() will wake exactly one
|
||||
of them up. The implementation may pick one at random, so the order in
|
||||
which blocked threads are awakened should not be relied on. There is no
|
||||
return value in this case.
|
||||
|
||||
When invoked with blocking set to true, do the same thing as when called
|
||||
without arguments, and return true.
|
||||
|
||||
When invoked with blocking set to false, do not block. If a call without
|
||||
an argument would block, return false immediately; otherwise, do the
|
||||
same thing as when called without arguments, and return true."""
|
||||
if not blocking and self.locked():
|
||||
return False
|
||||
self.upper_bound.release()
|
||||
@@ -114,6 +216,12 @@ class BoundedSemaphore(object):
|
||||
self.acquire()
|
||||
|
||||
def release(self, blocking=True):
|
||||
"""Release a semaphore. In this class, this behaves very much like
|
||||
an :meth:`acquire` but in the opposite direction.
|
||||
|
||||
Imagine the docs of :meth:`acquire` here, but with every direction
|
||||
reversed. When calling this method, it will block if the internal
|
||||
counter is greater than or equal to *limit*."""
|
||||
if not blocking and self.bounded():
|
||||
return False
|
||||
self.lower_bound.release()
|
||||
@@ -128,4 +236,12 @@ class BoundedSemaphore(object):
|
||||
|
||||
@property
|
||||
def balance(self):
|
||||
"""An integer value that represents how many new calls to
|
||||
:meth:`acquire` or :meth:`release` would be needed to get the counter to
|
||||
0. If it is positive, then its value is the number of acquires that can
|
||||
happen before the next acquire would block. If it is negative, it is
|
||||
the negative of the number of releases that would be required in order
|
||||
to make the counter 0 again (one more release would push the counter to
|
||||
1 and unblock acquirers). It takes into account how many greenthreads
|
||||
are currently blocking in :meth:`acquire` and :meth:`release`."""
|
||||
return self.lower_bound.balance - self.upper_bound.balance
|
@@ -2,8 +2,7 @@
|
||||
and returns the titles of those feeds.
|
||||
"""
|
||||
import eventlet
|
||||
from eventlet import patcher
|
||||
feedparser = patcher.import_patched('feedparser')
|
||||
feedparser = eventlet.import_patched('feedparser')
|
||||
|
||||
# the pool provides a safety limit on our concurrency
|
||||
pool = eventlet.GreenPool()
|
||||
|
@@ -5,7 +5,7 @@ from tests import LimitedTestCase
|
||||
|
||||
class TestSemaphore(LimitedTestCase):
|
||||
def test_bounded(self):
|
||||
sem = semaphore.BoundedSemaphore(2, limit=3)
|
||||
sem = semaphore.CappedSemaphore(2, limit=3)
|
||||
self.assertEqual(sem.acquire(), True)
|
||||
self.assertEqual(sem.acquire(), True)
|
||||
gt1 = eventlet.spawn(sem.release)
|
||||
@@ -21,7 +21,7 @@ class TestSemaphore(LimitedTestCase):
|
||||
gt2.wait()
|
||||
|
||||
def test_bounded_with_zero_limit(self):
|
||||
sem = semaphore.BoundedSemaphore(0, 0)
|
||||
sem = semaphore.CappedSemaphore(0, 0)
|
||||
gt = eventlet.spawn(sem.acquire)
|
||||
sem.release()
|
||||
gt.wait()
|
||||
|
@@ -8,7 +8,7 @@ def allocate_lock():
|
||||
original_allocate_lock = thread.allocate_lock
|
||||
thread.allocate_lock = allocate_lock
|
||||
original_LockType = thread.LockType
|
||||
thread.LockType = coros.BoundedSemaphore
|
||||
thread.LockType = coros.CappedSemaphore
|
||||
|
||||
try:
|
||||
import os.path
|
||||
|
Reference in New Issue
Block a user