Refactor proxy for concurrency and code reuse

This commit is contained in:
Michael Barton
2011-03-03 21:12:15 +00:00
committed by Tarmac
5 changed files with 192 additions and 439 deletions

View File

@@ -922,6 +922,17 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
return running_time + time_per_request return running_time + time_per_request
class ContextPool(GreenPool):
"GreenPool subclassed to kill its coros when it gets gc'ed"
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
for coro in list(self.coroutines_running):
coro.kill()
class ModifiedParseResult(ParseResult): class ModifiedParseResult(ParseResult):
"Parse results class for urlparse." "Parse results class for urlparse."

View File

@@ -160,7 +160,7 @@ class ContainerController(object):
return resp return resp
if existed: if existed:
return HTTPNoContent(request=req) return HTTPNoContent(request=req)
return HTTPAccepted(request=req) return HTTPNotFound()
def PUT(self, req): def PUT(self, req):
"""Handle HTTP PUT request.""" """Handle HTTP PUT request."""

View File

@@ -31,7 +31,7 @@ import functools
from hashlib import md5 from hashlib import md5
from random import shuffle from random import shuffle
from eventlet import sleep, TimeoutError from eventlet import sleep, GreenPile, Queue, TimeoutError
from eventlet.timeout import Timeout from eventlet.timeout import Timeout
from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
HTTPNotFound, HTTPPreconditionFailed, \ HTTPNotFound, HTTPPreconditionFailed, \
@@ -42,7 +42,7 @@ from webob import Request, Response
from swift.common.ring import Ring from swift.common.ring import Ring
from swift.common.utils import get_logger, normalize_timestamp, split_path, \ from swift.common.utils import get_logger, normalize_timestamp, split_path, \
cache_from_env cache_from_env, ContextPool
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \ from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
@@ -266,6 +266,7 @@ class SegmentedIterable(object):
class Controller(object): class Controller(object):
"""Base WSGI controller class for the proxy""" """Base WSGI controller class for the proxy"""
server_type = _('Base')
def __init__(self, app): def __init__(self, app):
self.account_name = None self.account_name = None
@@ -359,8 +360,6 @@ class Controller(object):
path = '/%s' % account path = '/%s' % account
headers = {'x-cf-trans-id': self.trans_id} headers = {'x-cf-trans-id': self.trans_id}
for node in self.iter_nodes(partition, nodes, self.app.account_ring): for node in self.iter_nodes(partition, nodes, self.app.account_ring):
if self.error_limited(node):
continue
try: try:
with ConnectionTimeout(self.app.conn_timeout): with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], conn = http_connect(node['ip'], node['port'],
@@ -433,8 +432,6 @@ class Controller(object):
attempts_left = self.app.container_ring.replica_count attempts_left = self.app.container_ring.replica_count
headers = {'x-cf-trans-id': self.trans_id} headers = {'x-cf-trans-id': self.trans_id}
for node in self.iter_nodes(partition, nodes, self.app.container_ring): for node in self.iter_nodes(partition, nodes, self.app.container_ring):
if self.error_limited(node):
continue
try: try:
with ConnectionTimeout(self.app.conn_timeout): with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], conn = http_connect(node['ip'], node['port'],
@@ -490,36 +487,54 @@ class Controller(object):
:param ring: ring to get handoff nodes from :param ring: ring to get handoff nodes from
""" """
for node in nodes: for node in nodes:
yield node if not self.error_limited(node):
yield node
for node in ring.get_more_nodes(partition): for node in ring.get_more_nodes(partition):
yield node if not self.error_limited(node):
yield node
def get_update_nodes(self, partition, nodes, ring): def _make_request(self, nodes, part, method, path, headers, query):
""" Returns ring.replica_count nodes; the nodes will not be error for node in nodes:
limited, if possible. """ try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], part, method, path,
headers=headers, query_string=query)
conn.node = node
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
if 200 <= resp.status < 500:
return resp.status, resp.reason, resp.read()
elif resp.status == 507:
self.error_limit(node)
except Exception:
self.error_limit(node)
self.exception_occurred(node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': method, 'path': path})
def make_requests(self, req, ring, part, method, path, headers,
query_string=''):
""" """
Attempt to get a non error limited list of nodes. Sends an HTTP request to multiple nodes and aggregates the results.
It attempts the primary nodes concurrently, then iterates over the
handoff nodes as needed.
:param partition: partition for the nodes :param headers: a list of dicts, where each dict represents one
:param nodes: list of node dicts for the partition backend request that should be made.
:param ring: ring to get handoff nodes from :returns: a webob Response object
:returns: list of node dicts that are not error limited (if possible)
""" """
nodes = self.iter_nodes(part, ring.get_part_nodes(part), ring)
# make a copy so we don't modify caller's list pile = GreenPile(ring.replica_count)
nodes = list(nodes) for head in headers:
update_nodes = [] pile.spawn(self._make_request, nodes, part, method, path,
for node in self.iter_nodes(partition, nodes, ring): head, query_string)
if self.error_limited(node): response = [resp for resp in pile if resp]
continue while len(response) < ring.replica_count:
update_nodes.append(node) response.append((503, '', ''))
if len(update_nodes) >= ring.replica_count: statuses, reasons, bodies = zip(*response)
break return self.best_response(req, statuses, reasons, bodies,
while len(update_nodes) < ring.replica_count: '%s %s' % (self.server_type, req.method))
node = nodes.pop()
if node not in update_nodes:
update_nodes.append(node)
return update_nodes
def best_response(self, req, statuses, reasons, bodies, server_type, def best_response(self, req, statuses, reasons, bodies, server_type,
etag=None): etag=None):
@@ -659,6 +674,7 @@ class Controller(object):
class ObjectController(Controller): class ObjectController(Controller):
"""WSGI controller for object requests.""" """WSGI controller for object requests."""
server_type = _('Object')
def __init__(self, app, account_name, container_name, object_name, def __init__(self, app, account_name, container_name, object_name,
**kwargs): **kwargs):
@@ -667,37 +683,6 @@ class ObjectController(Controller):
self.container_name = unquote(container_name) self.container_name = unquote(container_name)
self.object_name = unquote(object_name) self.object_name = unquote(object_name)
def node_post_or_delete(self, req, partition, node, path):
"""
Handle common POST/DELETE functionality
:param req: webob.Request object
:param partition: partition for the object
:param node: node dictionary for the object
:param path: path to send for the request
"""
if self.error_limited(node):
return 500, '', ''
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'],
partition, req.method, path, req.headers)
with Timeout(self.app.node_timeout):
response = conn.getresponse()
body = response.read()
if response.status == 507:
self.error_limit(node)
elif response.status >= 500:
self.error_occurred(node,
_('ERROR %(status)d %(body)s From Object Server') %
{'status': response.status, 'body': body[:1024]})
return response.status, response.reason, body
except (Exception, TimeoutError):
self.exception_occurred(node, _('Object'),
_('Trying to %(method)s %(path)s') %
{'method': req.method, 'path': req.path})
return 500, '', ''
def GETorHEAD(self, req): def GETorHEAD(self, req):
"""Handle HTTP GET or HEAD requests.""" """Handle HTTP GET or HEAD requests."""
if 'swift.authorize' in req.environ: if 'swift.authorize' in req.environ:
@@ -874,35 +859,50 @@ class ObjectController(Controller):
return aresp return aresp
if not containers: if not containers:
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes( partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name) self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time()) req.headers['X-Timestamp'] = normalize_timestamp(time.time())
statuses = [] headers = []
reasons = [] for container in containers:
bodies = [] nheaders = dict(req.headers.iteritems())
for node in self.iter_nodes(partition, nodes, self.app.object_ring): nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
container = containers.pop() nheaders['X-Container-Partition'] = container_partition
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container nheaders['X-Container-Device'] = container['device']
req.headers['X-Container-Partition'] = container_partition headers.append(nheaders)
req.headers['X-Container-Device'] = container['device'] return self.make_requests(req, self.app.object_ring,
status, reason, body = \ partition, 'POST', req.path_info, headers)
self.node_post_or_delete(req, partition, node, req.path_info)
if 200 <= status < 300 or 400 <= status < 500: def _send_file(self, conn, path):
statuses.append(status) """Method for a file PUT coro"""
reasons.append(reason) while True:
bodies.append(body) chunk = conn.queue.get()
else: if not conn.failed:
containers.insert(0, container) try:
if not containers: with ChunkWriteTimeout(self.app.node_timeout):
break conn.send(chunk)
while len(statuses) < len(nodes): except (Exception, ChunkWriteTimeout):
statuses.append(503) conn.failed = True
reasons.append('') self.exception_occurred(conn.node, _('Object'),
bodies.append('') _('Trying to write to %s') % path)
return self.best_response(req, statuses, reasons, conn.queue.task_done()
bodies, _('Object POST'))
def _connect_put_node(self, nodes, part, path, headers):
"""Method for a file PUT connect"""
for node in nodes:
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'PUT', path, headers)
with Timeout(self.app.node_timeout):
resp = conn.getexpect()
if resp.status == 100:
conn.node = node
return conn
elif resp.status == 507:
self.error_limit(node)
except:
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % path)
@public @public
@delay_denial @delay_denial
@@ -916,8 +916,6 @@ class ObjectController(Controller):
return aresp return aresp
if not containers: if not containers:
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes( partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name) self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time()) req.headers['X-Timestamp'] = normalize_timestamp(time.time())
@@ -925,15 +923,12 @@ class ObjectController(Controller):
content_type_manually_set = True content_type_manually_set = True
if not req.headers.get('content-type'): if not req.headers.get('content-type'):
guessed_type, _junk = mimetypes.guess_type(req.path_info) guessed_type, _junk = mimetypes.guess_type(req.path_info)
if not guessed_type: req.headers['Content-Type'] = guessed_type or \
req.headers['Content-Type'] = 'application/octet-stream' 'application/octet-stream'
else:
req.headers['Content-Type'] = guessed_type
content_type_manually_set = False content_type_manually_set = False
error_response = check_object_creation(req, self.object_name) error_response = check_object_creation(req, self.object_name)
if error_response: if error_response:
return error_response return error_response
conns = []
data_source = \ data_source = \
iter(lambda: req.body_file.read(self.app.client_chunk_size), '') iter(lambda: req.body_file.read(self.app.client_chunk_size), '')
source_header = req.headers.get('X-Copy-From') source_header = req.headers.get('X-Copy-From')
@@ -984,75 +979,57 @@ class ObjectController(Controller):
if k.lower().startswith('x-object-meta-'): if k.lower().startswith('x-object-meta-'):
new_req.headers[k] = v new_req.headers[k] = v
req = new_req req = new_req
for node in self.iter_nodes(partition, nodes, self.app.object_ring): node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
container = containers.pop() pile = GreenPile(len(nodes))
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container for container in containers:
req.headers['X-Container-Partition'] = container_partition nheaders = dict(req.headers.iteritems())
req.headers['X-Container-Device'] = container['device'] nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
req.headers['Expect'] = '100-continue' nheaders['X-Container-Partition'] = container_partition
resp = conn = None nheaders['X-Container-Device'] = container['device']
if not self.error_limited(node): nheaders['Expect'] = '100-continue'
try: pile.spawn(self._connect_put_node, node_iter, partition,
with ConnectionTimeout(self.app.conn_timeout): req.path_info, nheaders)
conn = http_connect(node['ip'], node['port'], conns = [conn for conn in pile if conn]
node['device'], partition, 'PUT',
req.path_info, req.headers)
conn.node = node
with Timeout(self.app.node_timeout):
resp = conn.getexpect()
except (Exception, TimeoutError):
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % req.path)
if conn and resp:
if resp.status == 100:
conns.append(conn)
if not containers:
break
continue
elif resp.status == 507:
self.error_limit(node)
containers.insert(0, container)
if len(conns) <= len(nodes) / 2: if len(conns) <= len(nodes) / 2:
self.app.logger.error( self.app.logger.error(
_('Object PUT returning 503, %(conns)s/%(nodes)s ' _('Object PUT returning 503, %(conns)s/%(nodes)s '
'required connections'), 'required connections'),
{'conns': len(conns), 'nodes': len(nodes) // 2 + 1}) {'conns': len(conns), 'nodes': len(nodes) // 2 + 1})
return HTTPServiceUnavailable(request=req) return HTTPServiceUnavailable(request=req)
chunked = req.headers.get('transfer-encoding')
try: try:
req.bytes_transferred = 0 with ContextPool(len(nodes)) as pool:
while True: for conn in conns:
with ChunkReadTimeout(self.app.client_timeout): conn.failed = False
try: conn.queue = Queue(self.app.put_queue_depth)
chunk = data_source.next() pool.spawn(self._send_file, conn, req.path)
except StopIteration: req.bytes_transferred = 0
if req.headers.get('transfer-encoding'): while True:
chunk = '' with ChunkReadTimeout(self.app.client_timeout):
else: try:
chunk = next(data_source)
except StopIteration:
if chunked:
[conn.queue.put('0\r\n\r\n') for conn in conns]
break break
len_chunk = len(chunk) req.bytes_transferred += len(chunk)
req.bytes_transferred += len_chunk if req.bytes_transferred > MAX_FILE_SIZE:
if req.bytes_transferred > MAX_FILE_SIZE: return HTTPRequestEntityTooLarge(request=req)
return HTTPRequestEntityTooLarge(request=req) for conn in list(conns):
for conn in list(conns): if not conn.failed:
try: conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk)
with ChunkWriteTimeout(self.app.node_timeout): if chunked else chunk)
if req.headers.get('transfer-encoding'): else:
conn.send('%x\r\n%s\r\n' % (len_chunk, chunk)) conns.remove(conn)
else: if len(conns) <= len(nodes) / 2:
conn.send(chunk) self.app.logger.error(_('Object PUT exceptions during'
except (Exception, TimeoutError): ' send, %(conns)s/%(nodes)s required connections'),
self.exception_occurred(conn.node, _('Object'), {'conns': len(conns), 'nodes': len(nodes) / 2 + 1})
_('Trying to write to %s') % req.path) return HTTPServiceUnavailable(request=req)
conns.remove(conn) for conn in conns:
if len(conns) <= len(nodes) / 2: if conn.queue.unfinished_tasks:
self.app.logger.error( conn.queue.join()
_('Object PUT exceptions during send, ' conns = [conn for conn in conns if not conn.failed]
'%(conns)s/%(nodes)s required connections'),
{'conns': len(conns),
'nodes': len(nodes) // 2 + 1})
return HTTPServiceUnavailable(request=req)
if req.headers.get('transfer-encoding') and chunk == '':
break
except ChunkReadTimeout, err: except ChunkReadTimeout, err:
self.app.logger.warn( self.app.logger.warn(
_('ERROR Client read timeout (%ss)'), err.seconds) _('ERROR Client read timeout (%ss)'), err.seconds)
@@ -1122,35 +1099,18 @@ class ObjectController(Controller):
return aresp return aresp
if not containers: if not containers:
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes( partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name) self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time()) req.headers['X-Timestamp'] = normalize_timestamp(time.time())
statuses = [] headers = []
reasons = [] for container in containers:
bodies = [] nheaders = dict(req.headers.iteritems())
for node in self.iter_nodes(partition, nodes, self.app.object_ring): nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
container = containers.pop() nheaders['X-Container-Partition'] = container_partition
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container nheaders['X-Container-Device'] = container['device']
req.headers['X-Container-Partition'] = container_partition headers.append(nheaders)
req.headers['X-Container-Device'] = container['device'] return self.make_requests(req, self.app.object_ring,
status, reason, body = \ partition, 'DELETE', req.path_info, headers)
self.node_post_or_delete(req, partition, node, req.path_info)
if 200 <= status < 300 or 400 <= status < 500:
statuses.append(status)
reasons.append(reason)
bodies.append(body)
else:
containers.insert(0, container)
if not containers:
break
while len(statuses) < len(nodes):
statuses.append(503)
reasons.append('')
bodies.append('')
return self.best_response(req, statuses, reasons, bodies,
_('Object DELETE'))
@public @public
@delay_denial @delay_denial
@@ -1184,6 +1144,7 @@ class ObjectController(Controller):
class ContainerController(Controller): class ContainerController(Controller):
"""WSGI controller for container requests""" """WSGI controller for container requests"""
server_type = _('Container')
# Ensure these are all lowercase # Ensure these are all lowercase
pass_through_headers = ['x-container-read', 'x-container-write'] pass_through_headers = ['x-container-read', 'x-container-write']
@@ -1259,59 +1220,25 @@ class ContainerController(Controller):
account_partition, accounts = self.account_info(self.account_name) account_partition, accounts = self.account_info(self.account_name)
if not accounts: if not accounts:
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
accounts = self.get_update_nodes(account_partition, accounts,
self.app.account_ring)
container_partition, containers = self.app.container_ring.get_nodes( container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name) self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()), headers = []
'x-cf-trans-id': self.trans_id} for account in accounts:
headers.update(value for value in req.headers.iteritems() nheaders = {'X-Timestamp': normalize_timestamp(time.time()),
if value[0].lower() in self.pass_through_headers or 'x-cf-trans-id': self.trans_id,
value[0].lower().startswith('x-container-meta-')) 'X-Account-Host': '%(ip)s:%(port)s' % account,
statuses = [] 'X-Account-Partition': account_partition,
reasons = [] 'X-Account-Device': account['device']}
bodies = [] nheaders.update(value for value in req.headers.iteritems()
for node in self.iter_nodes(container_partition, containers, if value[0].lower() in self.pass_through_headers or
self.app.container_ring): value[0].lower().startswith('x-container-meta-'))
if self.error_limited(node): headers.append(nheaders)
continue
try:
account = accounts.pop()
headers['X-Account-Host'] = '%(ip)s:%(port)s' % account
headers['X-Account-Partition'] = account_partition
headers['X-Account-Device'] = account['device']
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'PUT',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
accounts.insert(0, account)
except (Exception, TimeoutError):
accounts.insert(0, account)
self.exception_occurred(node, _('Container'),
_('Trying to PUT to %s') % req.path)
if not accounts:
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache: if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name, cache_key = get_container_memcache_key(self.account_name,
self.container_name) self.container_name)
self.app.memcache.delete(cache_key) self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies, return self.make_requests(req, self.app.container_ring,
_('Container PUT')) container_partition, 'PUT', req.path_info, headers)
@public @public
def POST(self, req): def POST(self, req):
@@ -1330,43 +1257,13 @@ class ContainerController(Controller):
headers.update(value for value in req.headers.iteritems() headers.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-')) value[0].lower().startswith('x-container-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(container_partition, containers,
self.app.container_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'POST',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Container'),
_('Trying to POST %s') % req.path)
if len(statuses) >= len(containers):
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache: if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name, cache_key = get_container_memcache_key(self.account_name,
self.container_name) self.container_name)
self.app.memcache.delete(cache_key) self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies, return self.make_requests(req, self.app.container_ring,
_('Container POST')) container_partition, 'POST', req.path_info,
[headers] * len(containers))
@public @public
def DELETE(self, req): def DELETE(self, req):
@@ -1374,65 +1271,21 @@ class ContainerController(Controller):
account_partition, accounts = self.account_info(self.account_name) account_partition, accounts = self.account_info(self.account_name)
if not accounts: if not accounts:
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
accounts = self.get_update_nodes(account_partition, accounts,
self.app.account_ring)
container_partition, containers = self.app.container_ring.get_nodes( container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name) self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()), headers = []
'x-cf-trans-id': self.trans_id} for account in accounts:
statuses = [] headers.append({'X-Timestamp': normalize_timestamp(time.time()),
reasons = [] 'X-Cf-Trans-Id': self.trans_id,
bodies = [] 'X-Account-Host': '%(ip)s:%(port)s' % account,
for node in self.iter_nodes(container_partition, containers, 'X-Account-Partition': account_partition,
self.app.container_ring): 'X-Account-Device': account['device']})
if self.error_limited(node):
continue
try:
account = accounts.pop()
headers['X-Account-Host'] = '%(ip)s:%(port)s' % account
headers['X-Account-Partition'] = account_partition
headers['X-Account-Device'] = account['device']
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'DELETE',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
accounts.insert(0, account)
except (Exception, TimeoutError):
accounts.insert(0, account)
self.exception_occurred(node, _('Container'),
_('Trying to DELETE %s') % req.path)
if not accounts:
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache: if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name, cache_key = get_container_memcache_key(self.account_name,
self.container_name) self.container_name)
self.app.memcache.delete(cache_key) self.app.memcache.delete(cache_key)
resp = self.best_response(req, statuses, reasons, bodies, resp = self.make_requests(req, self.app.container_ring,
_('Container DELETE')) container_partition, 'DELETE', req.path_info, headers)
if 200 <= resp.status_int <= 299:
for status in statuses:
if status < 200 or status > 299:
# If even one node doesn't do the delete, we can't be sure
# what the outcome will be once everything is in sync; so
# we 503.
self.app.logger.error(_('Returning 503 because not all '
'container nodes confirmed DELETE'))
return HTTPServiceUnavailable(request=req)
if resp.status_int == 202: # Indicates no server had the container if resp.status_int == 202: # Indicates no server had the container
return HTTPNotFound(request=req) return HTTPNotFound(request=req)
return resp return resp
@@ -1440,6 +1293,7 @@ class ContainerController(Controller):
class AccountController(Controller): class AccountController(Controller):
"""WSGI controller for account requests""" """WSGI controller for account requests"""
server_type = _('Account')
def __init__(self, app, account_name, **kwargs): def __init__(self, app, account_name, **kwargs):
Controller.__init__(self, app) Controller.__init__(self, app)
@@ -1470,42 +1324,10 @@ class AccountController(Controller):
'x-cf-trans-id': self.trans_id} 'x-cf-trans-id': self.trans_id}
headers.update(value for value in req.headers.iteritems() headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-')) if value[0].lower().startswith('x-account-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'PUT',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Account'),
_('Trying to PUT to %s') % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache: if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies, return self.make_requests(req, self.app.account_ring, account_partition,
_('Account PUT')) 'PUT', req.path_info, [headers] * len(accounts))
@public @public
def POST(self, req): def POST(self, req):
@@ -1519,41 +1341,10 @@ class AccountController(Controller):
'X-CF-Trans-Id': self.trans_id} 'X-CF-Trans-Id': self.trans_id}
headers.update(value for value in req.headers.iteritems() headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-')) if value[0].lower().startswith('x-account-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'POST',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Account'),
_('Trying to POST %s') % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache: if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies, return self.make_requests(req, self.app.account_ring, account_partition,
_('Account POST')) 'POST', req.path_info, [headers] * len(accounts))
@public @public
def DELETE(self, req): def DELETE(self, req):
@@ -1564,41 +1355,10 @@ class AccountController(Controller):
self.app.account_ring.get_nodes(self.account_name) self.app.account_ring.get_nodes(self.account_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()), headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-CF-Trans-Id': self.trans_id} 'X-CF-Trans-Id': self.trans_id}
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'DELETE',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Account'),
_('Trying to DELETE %s') % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache: if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies, return self.make_requests(req, self.app.account_ring, account_partition,
_('Account DELETE')) 'DELETE', req.path_info, [headers] * len(accounts))
class BaseApplication(object): class BaseApplication(object):
@@ -1624,6 +1384,7 @@ class BaseApplication(object):
self.node_timeout = int(conf.get('node_timeout', 10)) self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.client_timeout = int(conf.get('client_timeout', 60)) self.client_timeout = int(conf.get('client_timeout', 60))
self.put_queue_depth = int(conf.get('put_queue_depth', 10))
self.object_chunk_size = int(conf.get('object_chunk_size', 65536)) self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
self.client_chunk_size = int(conf.get('client_chunk_size', 65536)) self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.log_headers = conf.get('log_headers') == 'True' self.log_headers = conf.get('log_headers') == 'True'

View File

@@ -70,17 +70,6 @@ class TestContainerFailures(unittest.TestCase):
self.assert_(object1 in [o['name'] for o in self.assert_(object1 in [o['name'] for o in
client.get_container(self.url, self.token, container)[1]]) client.get_container(self.url, self.token, container)[1]])
# This fails because all three nodes have to indicate deletion before
# we tell the user it worked. Since the first node 409s (it hasn't got
# the update that the object was deleted yet), the whole must 503
# (until every is synced up, then the delete would work).
exc = None
try:
client.delete_container(self.url, self.token, container)
except client.ClientException, err:
exc = err
self.assert_(exc)
self.assert_(exc.http_status, 503)
# Unfortunately, the following might pass or fail, depending on the # Unfortunately, the following might pass or fail, depending on the
# position of the account server associated with the first container # position of the account server associated with the first container
# server we had killed. If the associated happens to be the first # server we had killed. If the associated happens to be the first
@@ -144,17 +133,6 @@ class TestContainerFailures(unittest.TestCase):
self.assert_(object1 not in [o['name'] for o in self.assert_(object1 not in [o['name'] for o in
client.get_container(self.url, self.token, container)[1]]) client.get_container(self.url, self.token, container)[1]])
# This fails because all three nodes have to indicate deletion before
# we tell the user it worked. Since the first node 409s (it hasn't got
# the update that the object was deleted yet), the whole must 503
# (until every is synced up, then the delete would work).
exc = None
try:
client.delete_container(self.url, self.token, container)
except client.ClientException, err:
exc = err
self.assert_(exc)
self.assert_(exc.http_status, 503)
# Unfortunately, the following might pass or fail, depending on the # Unfortunately, the following might pass or fail, depending on the
# position of the account server associated with the first container # position of the account server associated with the first container
# server we had killed. If the associated happens to be the first # server we had killed. If the associated happens to be the first

View File

@@ -249,6 +249,9 @@ class FakeRing(object):
{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'} {'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
return 1, devs return 1, devs
def get_part_nodes(self, part):
return self.get_nodes('blah')[1]
def get_more_nodes(self, nodes): def get_more_nodes(self, nodes):
# 9 is the true cap # 9 is the true cap
for x in xrange(3, min(3 + self.max_more_nodes, 9)): for x in xrange(3, min(3 + self.max_more_nodes, 9)):
@@ -2735,7 +2738,7 @@ class TestContainerController(unittest.TestCase):
self.assert_status_map(controller.DELETE, self.assert_status_map(controller.DELETE,
(200, 204, 204, 204), 204) (200, 204, 204, 204), 204)
self.assert_status_map(controller.DELETE, self.assert_status_map(controller.DELETE,
(200, 204, 204, 503), 503) (200, 204, 204, 503), 204)
self.assert_status_map(controller.DELETE, self.assert_status_map(controller.DELETE,
(200, 204, 503, 503), 503) (200, 204, 503, 503), 503)
self.assert_status_map(controller.DELETE, self.assert_status_map(controller.DELETE,