Removed ConnectionPool which didn't seem usable, and also Pool.fan which has been supplanted by better abstractions.
This commit is contained in:
@@ -39,13 +39,13 @@ class AllFailed(FanFailed):
|
|||||||
class Pool(object):
|
class Pool(object):
|
||||||
"""
|
"""
|
||||||
When using the pool, if you do a get, you should ALWAYS do a put.
|
When using the pool, if you do a get, you should ALWAYS do a put.
|
||||||
The pattern is:
|
The pattern is::
|
||||||
|
|
||||||
thing = self.pool.get()
|
thing = self.pool.get()
|
||||||
try:
|
try:
|
||||||
# do stuff
|
thing.method()
|
||||||
finally:
|
finally:
|
||||||
self.pool.put(thing)
|
self.pool.put(thing)
|
||||||
|
|
||||||
The maximum size of the pool can be modified at runtime via the max_size attribute.
|
The maximum size of the pool can be modified at runtime via the max_size attribute.
|
||||||
Adjusting this number does not affect existing items checked out of the pool, nor
|
Adjusting this number does not affect existing items checked out of the pool, nor
|
||||||
@@ -123,44 +123,6 @@ class Pool(object):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError("Implement in subclass")
|
raise NotImplementedError("Implement in subclass")
|
||||||
|
|
||||||
def fan(self, block, input_list):
|
|
||||||
queue = coros.queue(0)
|
|
||||||
results = []
|
|
||||||
exceptional_results = 0
|
|
||||||
for index, input_item in enumerate(input_list):
|
|
||||||
pool_item = self.get()
|
|
||||||
|
|
||||||
## Fan out
|
|
||||||
api.spawn(
|
|
||||||
self._invoke, block, pool_item, input_item, index, queue)
|
|
||||||
|
|
||||||
## Fan back in
|
|
||||||
for i in range(len(input_list)):
|
|
||||||
## Wait for all guys to send to the queue
|
|
||||||
index, value = queue.wait()
|
|
||||||
if isinstance(value, Exception):
|
|
||||||
exceptional_results += 1
|
|
||||||
results.append((index, value))
|
|
||||||
|
|
||||||
results.sort()
|
|
||||||
results = [value for index, value in results]
|
|
||||||
|
|
||||||
if exceptional_results:
|
|
||||||
if exceptional_results == len(results):
|
|
||||||
raise AllFailed(results)
|
|
||||||
raise SomeFailed(results)
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _invoke(self, block, pool_item, input_item, index, queue):
|
|
||||||
try:
|
|
||||||
result = block(pool_item, input_item)
|
|
||||||
except Exception, e:
|
|
||||||
self.put(pool_item)
|
|
||||||
queue.send((index, e))
|
|
||||||
return
|
|
||||||
self.put(pool_item)
|
|
||||||
queue.send((index, result))
|
|
||||||
|
|
||||||
|
|
||||||
class Token(object):
|
class Token(object):
|
||||||
pass
|
pass
|
||||||
@@ -175,32 +137,6 @@ class TokenPool(Pool):
|
|||||||
return Token()
|
return Token()
|
||||||
|
|
||||||
|
|
||||||
class ConnectionPool(Pool):
|
|
||||||
"""A Pool which can limit the number of concurrent http operations
|
|
||||||
being made to a given server.
|
|
||||||
|
|
||||||
*NOTE: *TODO:
|
|
||||||
|
|
||||||
This does NOT currently keep sockets open. It discards the created
|
|
||||||
http object when it is put back in the pool. This is because we do
|
|
||||||
not yet have a combination of http clients and servers which can work
|
|
||||||
together to do HTTP keepalive sockets without errors.
|
|
||||||
"""
|
|
||||||
def __init__(self, proto, netloc, use_proxy, min_size=0, max_size=4):
|
|
||||||
self.proto = proto
|
|
||||||
self.netloc = netloc
|
|
||||||
self.use_proxy = use_proxy
|
|
||||||
Pool.__init__(self, min_size, max_size)
|
|
||||||
|
|
||||||
def create(self):
|
|
||||||
import httpc
|
|
||||||
return httpc.make_connection(self.proto, self.netloc, self.use_proxy)
|
|
||||||
|
|
||||||
def put(self, item):
|
|
||||||
## Discard item, create a new connection for the pool
|
|
||||||
Pool.put(self, self.create())
|
|
||||||
|
|
||||||
|
|
||||||
class ExceptionWrapper(object):
|
class ExceptionWrapper(object):
|
||||||
def __init__(self, e):
|
def __init__(self, e):
|
||||||
self.e = e
|
self.e = e
|
||||||
|
Reference in New Issue
Block a user