Merge "Refactored lists of nodes to contact for requests"

This commit is contained in:
Jenkins 2013-04-15 23:05:47 +00:00 committed by Gerrit Code Review
commit c87576bb94
10 changed files with 180 additions and 105 deletions

View File

@ -669,6 +669,13 @@ rate_limit_after_segment 10 Rate limit the download of
this segment is downloaded.
rate_limit_segments_per_sec 1 Rate limit large object
downloads at this rate.
request_node_count 2 * replicas Set to the number of nodes to
contact for a normal request.
You can use '* replicas' at the
end to have it use the number
given times the number of
replicas for the ring being used
for the request.
============================ =============== =============================
[tempauth]

View File

@ -103,6 +103,10 @@ use = egg:swift#proxy
# as a regular object on GETs, i.e. will return that object's contents. Should
# be set to false if slo is not used in pipeline.
# allow_static_large_object = true
# Set to the number of nodes to contact for a normal request. You can use
# '* replicas' at the end to have it use the number given times the number of
# replicas for the ring being used for the request.
# request_node_count = 2 * replicas
[filter:tempauth]
use = egg:swift#tempauth

View File

@ -204,6 +204,21 @@ class Ring(object):
seen_ids.add(dev_id)
return part_nodes
def get_part(self, account, container=None, obj=None):
"""
Get the partition for an account/container/object.
:param account: account name
:param container: container name
:param obj: object name
:returns: the partition number
"""
key = hash_path(account, container, obj, raw_digest=True)
if time() > self._rtime:
self._reload()
part = struct.unpack_from('>I', key)[0] >> self._part_shift
return part
def get_part_nodes(self, part):
"""
Get the nodes that are responsible for the partition. If one
@ -248,10 +263,7 @@ class Ring(object):
hardware description
====== ===============================================================
"""
key = hash_path(account, container, obj, raw_digest=True)
if time() > self._rtime:
self._reload()
part = struct.unpack_from('>I', key)[0] >> self._part_shift
part = self.get_part(account, container, obj)
return part, self._get_part_nodes(part)
def get_more_nodes(self, part):

View File

@ -48,10 +48,9 @@ class AccountController(Controller):
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
partition, nodes = self.app.account_ring.get_nodes(self.account_name)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Account'), partition, nodes, req.path_info.rstrip('/'),
len(nodes))
req, _('Account'), self.app.account_ring, partition,
req.path_info.rstrip('/'))
if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
@ -70,8 +69,8 @@ class AccountController(Controller):
self.account_name)
return resp
resp = self.GETorHEAD_base(
req, _('Account'), partition, nodes, req.path_info.rstrip('/'),
len(nodes))
req, _('Account'), self.app.account_ring, partition,
req.path_info.rstrip('/'))
return resp
@public

View File

@ -242,7 +242,7 @@ def get_account_info(env, app, swift_source=None):
cache = cache_from_env(env)
if not cache:
return None
(version, account, container, _) = \
(version, account, _junk, _junk) = \
split_path(env['PATH_INFO'], 2, 4, True)
cache_key = get_account_memcache_key(account)
# Use a unique environment cache key per account. If you copy this env
@ -295,15 +295,6 @@ class Controller(object):
if k.lower() in self.pass_through_headers or
k.lower().startswith(x_meta))
def error_increment(self, node):
"""
Handles incrementing error counts when talking to nodes.
:param node: dictionary of node to increment the error count for
"""
node['errors'] = node.get('errors', 0) + 1
node['last_error'] = time.time()
def error_occurred(self, node, msg):
"""
Handle logging, and handling of errors.
@ -311,10 +302,11 @@ class Controller(object):
:param node: dictionary of node to handle errors for
:param msg: error message
"""
self.error_increment(node)
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'),
node['errors'] = node.get('errors', 0) + 1
node['last_error'] = time.time()
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port']})
'port': node['port'], 'device': node['device']})
def exception_occurred(self, node, typ, additional_info):
"""
@ -352,14 +344,21 @@ class Controller(object):
_('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
return limited
def error_limit(self, node):
def error_limit(self, node, msg):
"""
Mark a node as error limited.
Mark a node as error limited. This immediately pretends the
node received enough errors to trigger error suppression. Use
this for errors like Insufficient Storage. For other errors
use :func:`error_occurred`.
:param node: dictionary of node to error limit
:param msg: error message
"""
node['errors'] = self.app.error_suppression_limit + 1
node['last_error'] = time.time()
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
def account_info(self, account, autocreate=False):
"""
@ -393,16 +392,9 @@ class Controller(object):
elif result_code == HTTP_NOT_FOUND and not autocreate:
return None, None, None
result_code = 0
attempts_left = len(nodes)
path = '/%s' % account
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
iternodes = self.iter_nodes(partition, nodes, self.app.account_ring)
while attempts_left > 0:
try:
node = iternodes.next()
except StopIteration:
break
attempts_left -= 1
for node in self.iter_nodes(self.app.account_ring, partition):
try:
start_node_timing = time.time()
with ConnectionTimeout(self.app.conn_timeout):
@ -412,7 +404,7 @@ class Controller(object):
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
resp.read()
body = resp.read()
if is_success(resp.status):
result_code = HTTP_OK
account_info.update(
@ -424,10 +416,16 @@ class Controller(object):
elif result_code != HTTP_NOT_FOUND:
result_code = -1
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
continue
else:
result_code = -1
if is_server_error(resp.status):
self.error_occurred(
node,
_('ERROR %(status)d %(body)s From Account '
'Server') %
{'status': resp.status, 'body': body[:1024]})
except (Exception, Timeout):
self.exception_occurred(node, _('Account'),
_('Trying to get account info for %s')
@ -497,9 +495,8 @@ class Controller(object):
return container_info
if not self.account_info(account, autocreate=account_autocreate)[1]:
return container_info
attempts_left = len(nodes)
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
for node in self.iter_nodes(part, nodes, self.app.container_ring):
for node in self.iter_nodes(self.app.container_ring, part):
try:
start_node_timing = time.time()
with ConnectionTimeout(self.app.conn_timeout):
@ -509,7 +506,7 @@ class Controller(object):
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
resp.read()
body = resp.read()
if is_success(resp.status):
container_info.update(
headers_to_container_info(resp.getheaders()))
@ -519,14 +516,16 @@ class Controller(object):
else:
container_info['status'] = -1
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(resp.status):
self.error_occurred(node, _(
'ERROR %(status)d %(body)s From Container '
'Server') %
{'status': resp.status, 'body': body[:1024]})
except (Exception, Timeout):
self.exception_occurred(
node, _('Container'),
_('Trying to get container info for %s') % path)
attempts_left -= 1
if attempts_left <= 0:
break
if self.app.memcache:
if container_info['status'] == HTTP_OK:
self.app.memcache.set(
@ -541,18 +540,25 @@ class Controller(object):
container_info['nodes'] = nodes
return container_info
def iter_nodes(self, partition, nodes, ring):
def iter_nodes(self, ring, partition):
"""
Node iterator that will first iterate over the normal nodes for a
partition and then the handoff partitions for the node.
Yields nodes for a ring partition, skipping over error
limited nodes and stopping at the configurable number of
nodes. If a node yielded subsequently gets error limited, an
extra node will be yielded to take its place.
:param partition: partition to iterate nodes for
:param nodes: list of node dicts from the ring
:param ring: ring to get handoff nodes from
:param ring: ring to get yield nodes from
:param partition: ring partition to yield nodes for
"""
for node in nodes:
primary_nodes = self.app.sort_nodes(ring.get_part_nodes(partition))
nodes_left = self.app.request_node_count(ring)
for node in primary_nodes:
if not self.error_limited(node):
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
handoffs = 0
for node in ring.get_more_nodes(partition):
if not self.error_limited(node):
@ -561,9 +567,13 @@ class Controller(object):
self.app.logger.increment('handoff_count')
self.app.logger.warning(
'Handoff requested (%d)' % handoffs)
if handoffs == len(nodes):
if handoffs == len(primary_nodes):
self.app.logger.increment('handoff_all_count')
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
def _make_request(self, nodes, part, method, path, headers, query,
logger_thread_locals):
@ -583,7 +593,7 @@ class Controller(object):
not is_server_error(resp.status):
return resp.status, resp.reason, resp.read()
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
except (Exception, Timeout):
self.exception_occurred(node, self.server_type,
_('Trying to %(method)s %(path)s') %
@ -601,7 +611,7 @@ class Controller(object):
:returns: a swob.Response object
"""
start_nodes = ring.get_part_nodes(part)
nodes = self.iter_nodes(part, start_nodes, ring)
nodes = self.iter_nodes(ring, part)
pile = GreenPile(len(start_nodes))
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,
@ -755,17 +765,15 @@ class Controller(object):
"""
return is_success(src.status) or is_redirection(src.status)
def GETorHEAD_base(self, req, server_type, partition, nodes, path,
attempts):
def GETorHEAD_base(self, req, server_type, ring, partition, path):
"""
Base handler for HTTP GET or HEAD requests.
:param req: swob.Request object
:param server_type: server type
:param ring: the ring to obtain nodes from
:param partition: partition
:param nodes: nodes
:param path: path for the request
:param attempts: number of attempts to try
:returns: swob.Response object
"""
statuses = []
@ -773,14 +781,7 @@ class Controller(object):
bodies = []
sources = []
newest = config_true_value(req.headers.get('x-newest', 'f'))
nodes = iter(nodes)
while len(statuses) < attempts:
try:
node = nodes.next()
except StopIteration:
break
if self.error_limited(node):
continue
for node in self.iter_nodes(ring, partition):
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
@ -811,7 +812,7 @@ class Controller(object):
statuses.append(possible_source.status)
reasons.append(possible_source.reason)
bodies.append('')
sources.append(possible_source)
sources.append((possible_source, node))
if not newest: # one good source is enough
break
else:
@ -819,7 +820,7 @@ class Controller(object):
reasons.append(possible_source.reason)
bodies.append(possible_source.read())
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status):
self.error_occurred(node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') %
@ -827,9 +828,9 @@ class Controller(object):
'body': bodies[-1][:1024],
'type': server_type})
if sources:
sources.sort(key=source_key)
source = sources.pop()
for src in sources:
sources.sort(key=lambda s: source_key(s[0]))
source, node = sources.pop()
for src, _junk in sources:
self.close_swift_conn(src)
res = Response(request=req, conditional_response=True)
if req.method == 'GET' and \

View File

@ -66,11 +66,10 @@ class ContainerController(Controller):
"""Handler for HTTP GET/HEAD requests."""
if not self.account_info(self.account_name)[1]:
return HTTPNotFound(request=req)
part, nodes = self.app.container_ring.get_nodes(
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Container'), part, nodes, req.path_info, len(nodes))
req, _('Container'), self.app.container_ring, part, req.path_info)
if self.app.memcache:
# set the memcache container size for ratelimiting
cache_key = get_container_memcache_key(self.account_name,

View File

@ -140,7 +140,7 @@ class SegmentedIterable(object):
self.segment_dict['name'].lstrip('/').split('/', 1)
else:
container, obj = self.container, self.segment_dict['name']
partition, nodes = self.controller.app.object_ring.get_nodes(
partition = self.controller.app.object_ring.get_part(
self.controller.account_name, container, obj)
path = '/%s/%s/%s' % (self.controller.account_name, container, obj)
req = Request.blank(path)
@ -152,12 +152,9 @@ class SegmentedIterable(object):
sleep(max(self.next_get_time - time.time(), 0))
self.next_get_time = time.time() + \
1.0 / self.controller.app.rate_limit_segments_per_sec
nodes = self.controller.app.sort_nodes(nodes)
resp = self.controller.GETorHEAD_base(
req, _('Object'), partition,
self.controller.iter_nodes(partition, nodes,
self.controller.app.object_ring),
path, len(nodes))
req, _('Object'), self.controller.app.object_ring, partition,
path)
if self.is_slo and resp.status_int == HTTP_NOT_FOUND:
raise SloSegmentError(_(
'Could not load object segment %(path)s:'
@ -309,7 +306,7 @@ class ObjectController(Controller):
yield item
def _listing_pages_iter(self, lcontainer, lprefix, env):
lpartition, lnodes = self.app.container_ring.get_nodes(
lpartition = self.app.container_ring.get_part(
self.account_name, lcontainer)
marker = ''
while True:
@ -321,10 +318,9 @@ class ObjectController(Controller):
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
lnodes = self.app.sort_nodes(lnodes)
lresp = self.GETorHEAD_base(
lreq, _('Container'), lpartition, lnodes, lreq.path_info,
len(lnodes))
lreq, _('Container'), self.app.container_ring, lpartition,
lreq.path_info)
if 'swift.authorize' in env:
lreq.acl = lresp.headers.get('x-container-read')
aresp = env['swift.authorize'](lreq)
@ -385,13 +381,10 @@ class ObjectController(Controller):
if aresp:
return aresp
partition, nodes = self.app.object_ring.get_nodes(
partition = self.app.object_ring.get_part(
self.account_name, self.container_name, self.object_name)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, len(nodes))
req, _('Object'), self.app.object_ring, partition, req.path_info)
if ';' in resp.headers.get('content-type', ''):
# strip off swift_bytes from content-type
@ -425,11 +418,9 @@ class ObjectController(Controller):
new_req = req.copy_get()
new_req.method = 'GET'
new_req.range = None
nodes = self.app.sort_nodes(nodes)
new_resp = self.GETorHEAD_base(
new_req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, len(nodes))
new_req, _('Object'), self.app.object_ring, partition,
req.path_info)
if new_resp.status_int // 100 == 2:
try:
listing = json.loads(new_resp.body)
@ -686,7 +677,7 @@ class ObjectController(Controller):
conn.node = node
return conn
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
except:
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % path)
@ -745,8 +736,9 @@ class ObjectController(Controller):
req.environ.get('swift_versioned_copy')):
hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'},
environ={'REQUEST_METHOD': 'HEAD'})
hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
hreq.path_info, len(nodes))
hresp = self.GETorHEAD_base(
hreq, _('Object'), self.app.object_ring, partition,
hreq.path_info)
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
@ -868,7 +860,7 @@ class ObjectController(Controller):
source_resp.headers['X-Static-Large-Object']
req = new_req
node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
node_iter = self.iter_nodes(self.app.object_ring, partition)
pile = GreenPile(len(nodes))
chunked = req.headers.get('transfer-encoding')

View File

@ -116,6 +116,16 @@ class Application(object):
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
self.allow_static_large_object = config_true_value(
conf.get('allow_static_large_object', 'true'))
value = conf.get('request_node_count', '2 * replicas').lower().split()
if len(value) == 1:
value = int(value[0])
self.request_node_count = lambda r: value
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
value = int(value[0])
self.request_node_count = lambda r: value * r.replica_count
else:
raise ValueError(
'Invalid request_node_count value: %r' % ''.join(value))
def get_controller(self, path):
"""

View File

@ -201,6 +201,13 @@ class TestRing(unittest.TestCase):
self.assertEquals(len(self.ring.devs), 9)
self.assertNotEquals(self.ring._mtime, orig_mtime)
def test_get_part(self):
part1 = self.ring.get_part('a')
nodes1 = self.ring.get_part_nodes(part1)
part2, nodes2 = self.ring.get_nodes('a')
self.assertEquals(part1, part2)
self.assertEquals(nodes1, nodes2)
def test_get_part_nodes(self):
part, nodes = self.ring.get_nodes('a')
self.assertEquals(nodes, self.ring.get_part_nodes(part))

View File

@ -21,7 +21,7 @@ import sys
import unittest
import urlparse
import signal
from contextlib import contextmanager
from contextlib import contextmanager, nested
from gzip import GzipFile
from shutil import rmtree
import time
@ -30,6 +30,7 @@ from hashlib import md5
from tempfile import mkdtemp
import random
import mock
from eventlet import sleep, spawn, wsgi, listen
import simplejson
@ -204,6 +205,13 @@ class FakeRing(object):
self.replicas = replicas
self.devs = {}
@property
def replica_count(self):
return self.replicas
def get_part(self, account, container=None, obj=None):
return 1
def get_nodes(self, account, container=None, obj=None):
devs = []
for x in xrange(self.replicas):
@ -1922,12 +1930,13 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.app.object_ring.max_more_nodes = 20
self.app.request_node_count = lambda r: 20
controller = proxy_server.ObjectController(self.app, 'account',
'container',
'object')
@ -1935,8 +1944,8 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 9)
@ -1950,8 +1959,8 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(
@ -1969,14 +1978,49 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(self.app.logger.log_dict['warning'], [])
finally:
self.app.object_ring.max_more_nodes = 0
def test_iter_nodes_calls_sort_nodes(self):
with mock.patch.object(self.app, 'sort_nodes') as sort_nodes:
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
for node in controller.iter_nodes(self.app.object_ring, 0):
pass
sort_nodes.assert_called_once_with(
self.app.object_ring.get_part_nodes(0))
def test_iter_nodes_skips_error_limited(self):
with mock.patch.object(self.app, 'sort_nodes', lambda n: n):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
self.assertTrue(first_nodes[0] in second_nodes)
controller.error_limit(first_nodes[0], 'test')
second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
self.assertTrue(first_nodes[0] not in second_nodes)
def test_iter_nodes_gives_extra_if_error_limited_inline(self):
with nested(
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
mock.patch.object(self.app, 'request_node_count',
lambda r: 6),
mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
second_nodes = []
for node in controller.iter_nodes(self.app.object_ring, 0):
if not second_nodes:
controller.error_limit(node, 'test')
second_nodes.append(node)
self.assertEquals(len(first_nodes), 6)
self.assertEquals(len(second_nodes), 7)
def test_best_response_sets_etag(self):
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
@ -5475,8 +5519,8 @@ class FakeObjectController(object):
resp = Response(app_iter=iter(body))
return resp
def iter_nodes(self, partition, nodes, ring):
for node in nodes:
def iter_nodes(self, ring, partition):
for node in ring.get_part_nodes(partition):
yield node
for node in ring.get_more_nodes(partition):
yield node