Merge "container_info returns a dict"

This commit is contained in:
Jenkins 2012-09-11 20:24:13 +00:00 committed by Gerrit Code Review
commit 5c67f1a695
6 changed files with 108 additions and 90 deletions
swift
common/middleware
proxy/controllers
test/unit
common/middleware
proxy

@ -118,7 +118,8 @@ class RateLimitMiddleware(object):
container_name) container_name)
container_info = self.memcache_client.get(memcache_key) container_info = self.memcache_client.get(memcache_key)
if isinstance(container_info, dict): if isinstance(container_info, dict):
container_size = container_info.get('container_size', 0) container_size = container_info.get(
'count', container_info.get('container_size', 0))
container_rate = self.get_container_maxrate(container_size) container_rate = self.get_container_maxrate(container_size)
if container_rate: if container_rate:
keys.append(("ratelimit/%s/%s" % (account_name, keys.append(("ratelimit/%s/%s" % (account_name,

@ -266,89 +266,79 @@ class Controller(object):
:param account: account name for the container :param account: account name for the container
:param container: container name to look up :param container: container name to look up
:returns: tuple of (container partition, container nodes, container :returns: dict containing at least container partition ('partition'),
read acl, container write acl, container sync key) or (None, container nodes ('containers'), container read
None, None, None, None) if the container does not exist acl ('read_acl'), container write acl ('write_acl'),
and container sync key ('sync_key').
Values are set to None if the container does not exist.
""" """
partition, nodes = self.app.container_ring.get_nodes( part, nodes = self.app.container_ring.get_nodes(account, container)
account, container)
path = '/%s/%s' % (account, container) path = '/%s/%s' % (account, container)
container_info = {'status': 0, 'read_acl': None,
'write_acl': None, 'sync_key': None,
'count': None, 'bytes': None,
'versions': None, 'partition': None,
'nodes': None}
if self.app.memcache: if self.app.memcache:
cache_key = get_container_memcache_key(account, container) cache_key = get_container_memcache_key(account, container)
cache_value = self.app.memcache.get(cache_key) cache_value = self.app.memcache.get(cache_key)
if isinstance(cache_value, dict): if isinstance(cache_value, dict):
status = cache_value['status'] if 'container_size' in cache_value:
read_acl = cache_value['read_acl'] cache_value['count'] = cache_value['container_size']
write_acl = cache_value['write_acl'] if is_success(cache_value['status']):
sync_key = cache_value.get('sync_key') container_info.update(cache_value)
versions = cache_value.get('versions') container_info['partition'] = part
if status == HTTP_OK: container_info['nodes'] = nodes
return partition, nodes, read_acl, write_acl, sync_key, \ return container_info
versions
elif status == HTTP_NOT_FOUND:
return None, None, None, None, None, None
if not self.account_info(account, autocreate=account_autocreate)[1]: if not self.account_info(account, autocreate=account_autocreate)[1]:
return None, None, None, None, None, None return container_info
result_code = 0
read_acl = None
write_acl = None
sync_key = None
container_size = None
versions = None
attempts_left = len(nodes) attempts_left = len(nodes)
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
iternodes = self.iter_nodes(partition, nodes, self.app.container_ring) for node in self.iter_nodes(part, nodes, self.app.container_ring):
while attempts_left > 0:
try:
node = iternodes.next()
except StopIteration:
break
attempts_left -= 1
try: try:
with ConnectionTimeout(self.app.conn_timeout): with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], conn = http_connect(node['ip'], node['port'],
node['device'], partition, 'HEAD', path, headers) node['device'], part, 'HEAD',
path, headers)
with Timeout(self.app.node_timeout): with Timeout(self.app.node_timeout):
resp = conn.getresponse() resp = conn.getresponse()
body = resp.read() body = resp.read()
if is_success(resp.status): if is_success(resp.status):
result_code = HTTP_OK container_info.update({
read_acl = resp.getheader('x-container-read') 'status': HTTP_OK,
write_acl = resp.getheader('x-container-write') 'read_acl': resp.getheader('x-container-read'),
sync_key = resp.getheader('x-container-sync-key') 'write_acl': resp.getheader('x-container-write'),
container_size = \ 'sync_key': resp.getheader('x-container-sync-key'),
resp.getheader('X-Container-Object-Count') 'count': resp.getheader('x-container-object-count'),
versions = resp.getheader('x-versions-location') 'bytes': resp.getheader('x-container-bytes-used'),
break 'versions': resp.getheader('x-versions-location')})
elif resp.status == HTTP_NOT_FOUND: break
if result_code == 0: elif resp.status == HTTP_NOT_FOUND:
result_code = HTTP_NOT_FOUND container_info['status'] = HTTP_NOT_FOUND
elif result_code != HTTP_NOT_FOUND: else:
result_code = -1 container_info['status'] = -1
elif resp.status == HTTP_INSUFFICIENT_STORAGE: if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node) self.error_limit(node)
continue
else:
result_code = -1
except (Exception, Timeout): except (Exception, Timeout):
self.exception_occurred(node, _('Container'), self.exception_occurred(
node, _('Container'),
_('Trying to get container info for %s') % path) _('Trying to get container info for %s') % path)
if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND): attempts_left -= 1
if result_code == HTTP_OK: if attempts_left <= 0:
cache_timeout = self.app.recheck_container_existence break
else: if self.app.memcache:
cache_timeout = self.app.recheck_container_existence * 0.1 if container_info['status'] == HTTP_OK:
self.app.memcache.set(cache_key, self.app.memcache.set(
{'status': result_code, cache_key, container_info,
'read_acl': read_acl, timeout=self.app.recheck_container_existence)
'write_acl': write_acl, elif container_info['status'] == HTTP_NOT_FOUND:
'sync_key': sync_key, self.app.memcache.set(
'container_size': container_size, cache_key, container_info,
'versions': versions}, timeout=self.app.recheck_container_existence * 0.1)
timeout=cache_timeout) if container_info['status'] == HTTP_OK:
if result_code == HTTP_OK: container_info['partition'] = part
return partition, nodes, read_acl, write_acl, sync_key, versions container_info['nodes'] = nodes
return None, None, None, None, None, None return container_info
def iter_nodes(self, partition, nodes, ring): def iter_nodes(self, partition, nodes, ring):
""" """

@ -82,7 +82,8 @@ class ContainerController(Controller):
'read_acl': resp.headers.get('x-container-read'), 'read_acl': resp.headers.get('x-container-read'),
'write_acl': resp.headers.get('x-container-write'), 'write_acl': resp.headers.get('x-container-write'),
'sync_key': resp.headers.get('x-container-sync-key'), 'sync_key': resp.headers.get('x-container-sync-key'),
'container_size': resp.headers.get('x-container-object-count'), 'count': resp.headers.get('x-container-object-count'),
'bytes': resp.headers.get('x-container-bytes-used'),
'versions': resp.headers.get('x-versions-location')}, 'versions': resp.headers.get('x-versions-location')},
timeout=self.app.recheck_container_existence) timeout=self.app.recheck_container_existence)
@ -173,9 +174,8 @@ class ContainerController(Controller):
'Connection': 'close'} 'Connection': 'close'}
self.transfer_headers(req.headers, headers) self.transfer_headers(req.headers, headers)
if self.app.memcache: if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name, self.app.memcache.delete(get_container_memcache_key(
self.container_name) self.account_name, self.container_name))
self.app.memcache.delete(cache_key)
resp = self.make_requests(req, self.app.container_ring, resp = self.make_requests(req, self.app.container_ring,
container_partition, 'POST', req.path_info, container_partition, 'POST', req.path_info,
[headers] * len(containers)) [headers] * len(containers))

@ -278,8 +278,9 @@ class ObjectController(Controller):
def GETorHEAD(self, req): def GETorHEAD(self, req):
"""Handle HTTP GET or HEAD requests.""" """Handle HTTP GET or HEAD requests."""
_junk, _junk, req.acl, _junk, _junk, object_versions = \ container_info = self.container_info(self.account_name,
self.container_info(self.account_name, self.container_name) self.container_name)
req.acl = container_info['read_acl']
if 'swift.authorize' in req.environ: if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req) aresp = req.environ['swift.authorize'](req)
if aresp: if aresp:
@ -413,9 +414,12 @@ class ObjectController(Controller):
error_response = check_metadata(req, 'object') error_response = check_metadata(req, 'object')
if error_response: if error_response:
return error_response return error_response
container_partition, containers, _junk, req.acl, _junk, _junk = \ container_info = self.container_info(
self.container_info(self.account_name, self.container_name, self.account_name, self.container_name,
account_autocreate=self.app.account_autocreate) account_autocreate=self.app.account_autocreate)
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
if 'swift.authorize' in req.environ: if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req) aresp = req.environ['swift.authorize'](req)
if aresp: if aresp:
@ -498,10 +502,14 @@ class ObjectController(Controller):
@delay_denial @delay_denial
def PUT(self, req): def PUT(self, req):
"""HTTP PUT request handler.""" """HTTP PUT request handler."""
(container_partition, containers, _junk, req.acl, container_info = self.container_info(
req.environ['swift_sync_key'], object_versions) = \ self.account_name, self.container_name,
self.container_info(self.account_name, self.container_name, account_autocreate=self.app.account_autocreate)
account_autocreate=self.app.account_autocreate) container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
object_versions = container_info['versions']
if 'swift.authorize' in req.environ: if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req) aresp = req.environ['swift.authorize'](req)
if aresp: if aresp:
@ -776,9 +784,13 @@ class ObjectController(Controller):
@delay_denial @delay_denial
def DELETE(self, req): def DELETE(self, req):
"""HTTP DELETE request handler.""" """HTTP DELETE request handler."""
(container_partition, containers, _junk, req.acl, container_info = self.container_info(self.account_name,
req.environ['swift_sync_key'], object_versions) = \ self.container_name)
self.container_info(self.account_name, self.container_name) container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
object_versions = container_info['versions']
if object_versions: if object_versions:
# this is a version manifest and needs to be handled differently # this is a version manifest and needs to be handled differently
lcontainer = object_versions.split('/')[0] lcontainer = object_versions.split('/')[0]
@ -824,9 +836,11 @@ class ObjectController(Controller):
self.container_name = lcontainer self.container_name = lcontainer
self.object_name = last_item['name'] self.object_name = last_item['name']
new_del_req = Request.blank(copy_path, environ=req.environ) new_del_req = Request.blank(copy_path, environ=req.environ)
(container_partition, containers, container_info = self.container_info(self.account_name,
_junk, new_del_req.acl, _junk, _junk) = \ self.container_name)
self.container_info(self.account_name, self.container_name) container_partition = container_info['partition']
containers = container_info['nodes']
new_del_req.acl = container_info['write_acl']
new_del_req.path_info = copy_path new_del_req.path_info = copy_path
req = new_del_req req = new_del_req
if 'swift.authorize' in req.environ: if 'swift.authorize' in req.environ:

@ -184,7 +184,7 @@ class TestRateLimit(unittest.TestCase):
'container_ratelimit_3': 200} 'container_ratelimit_3': 200}
fake_memcache = FakeMemcache() fake_memcache = FakeMemcache()
fake_memcache.store[get_container_memcache_key('a', 'c')] = \ fake_memcache.store[get_container_memcache_key('a', 'c')] = \
{'container_size': 5} {'count': 5}
the_app = ratelimit.RateLimitMiddleware(None, conf_dict, the_app = ratelimit.RateLimitMiddleware(None, conf_dict,
logger=FakeLogger()) logger=FakeLogger())
the_app.memcache_client = fake_memcache the_app.memcache_client = fake_memcache
@ -199,6 +199,19 @@ class TestRateLimit(unittest.TestCase):
self.assertEquals(len(the_app.get_ratelimitable_key_tuples( self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'PUT', 'a', 'c', 'o')), 1) 'PUT', 'a', 'c', 'o')), 1)
def test_ratelimit_old_memcache_format(self):
current_rate = 13
conf_dict = {'account_ratelimit': current_rate,
'container_ratelimit_3': 200}
fake_memcache = FakeMemcache()
fake_memcache.store[get_container_memcache_key('a', 'c')] = \
{'container_size': 5}
the_app = ratelimit.RateLimitMiddleware(None, conf_dict,
logger=FakeLogger())
the_app.memcache_client = fake_memcache
tuples = the_app.get_ratelimitable_key_tuples('PUT', 'a', 'c', 'o')
self.assertEquals(tuples, [('ratelimit/a/c', 200.0)])
def test_account_ratelimit(self): def test_account_ratelimit(self):
current_rate = 5 current_rate = 5
num_calls = 50 num_calls = 50

@ -502,10 +502,10 @@ class TestController(unittest.TestCase):
partition, nodes = self.container_ring.get_nodes(self.account, partition, nodes = self.container_ring.get_nodes(self.account,
self.container) self.container)
read_acl, write_acl = self.read_acl, self.write_acl read_acl, write_acl = self.read_acl, self.write_acl
self.assertEqual(partition, ret[0]) self.assertEqual(partition, ret['partition'])
self.assertEqual(nodes, ret[1]) self.assertEqual(nodes, ret['nodes'])
self.assertEqual(read_acl, ret[2]) self.assertEqual(read_acl, ret['read_acl'])
self.assertEqual(write_acl, ret[3]) self.assertEqual(write_acl, ret['write_acl'])
def test_container_info_invalid_account(self): def test_container_info_invalid_account(self):
def account_info(self, account, autocreate=False): def account_info(self, account, autocreate=False):