Refactored lists of nodes to contact for requests

Extensive refactor here to consolidate what nodes are contacted for
any request. This consolidation means reads will contact the same set
of nodes that writes would, giving a very good chance that
read-your-write behavior will succeed. This also means that writes
will not necessarily try all nodes in the cluster as it would
previously, which really wasn't desirable anyway. (If you really want
that, you can set request_node_count to a really big number, but
understand that also means reads will contact every node looking for
something that might not exist.)

* Added a request_node_count proxy-server conf value that allows
  control of how many nodes are contacted for a normal request.

In proxy.controllers.base.Controller:

* Got rid of error_increment since it was only used in one spot by
  another method and just served to confuse.

* Made error_occurred also log the device name.

* Made error_limit require an error message and also documented a bit
  better.

* Changed iter_nodes to just take a ring and a partition and yield
  all the nodes itself so it could control the number of nodes used
  in a given request. Also happens to consolidate where sort_nodes is
  called.

* Updated account_info and container_info to use all nodes from
  iter_nodes and to call error_occurred appropriately.

* Updated GETorHEAD_base to not track attempts on its own and just
  stop when iter_nodes tells it to stop. Also, it doesn't take the
  nodes to contact anymore; instead it takes the ring and gets the
  nodes from iter_nodes itself.

Elsewhere:

* Ring now has a get_part method.

* Made changes to reflect all of the above.

Change-Id: I37f76c99286b6456311abf25167cd0485bfcafac
This commit is contained in:
gholt 2013-04-06 01:35:58 +00:00
parent 48a94f39dd
commit d79a67ebf6
10 changed files with 180 additions and 105 deletions

View File

@ -669,6 +669,13 @@ rate_limit_after_segment 10 Rate limit the download of
this segment is downloaded.
rate_limit_segments_per_sec 1 Rate limit large object
downloads at this rate.
request_node_count 2 * replicas Set to the number of nodes to
contact for a normal request.
You can use '* replicas' at the
end to have it use the number
given times the number of
replicas for the ring being used
for the request.
============================ =============== =============================
[tempauth]

View File

@ -98,6 +98,10 @@ use = egg:swift#proxy
# as a regular object on GETs, i.e. will return that object's contents. Should
# be set to false if slo is not used in pipeline.
# allow_static_large_object = true
# Set to the number of nodes to contact for a normal request. You can use
# '* replicas' at the end to have it use the number given times the number of
# replicas for the ring being used for the request.
# request_node_count = 2 * replicas
[filter:tempauth]
use = egg:swift#tempauth

View File

@ -204,6 +204,21 @@ class Ring(object):
seen_ids.add(dev_id)
return part_nodes
def get_part(self, account, container=None, obj=None):
"""
Get the partition for an account/container/object.
:param account: account name
:param container: container name
:param obj: object name
:returns: the partition number
"""
key = hash_path(account, container, obj, raw_digest=True)
if time() > self._rtime:
self._reload()
part = struct.unpack_from('>I', key)[0] >> self._part_shift
return part
def get_part_nodes(self, part):
"""
Get the nodes that are responsible for the partition. If one
@ -248,10 +263,7 @@ class Ring(object):
hardware description
====== ===============================================================
"""
key = hash_path(account, container, obj, raw_digest=True)
if time() > self._rtime:
self._reload()
part = struct.unpack_from('>I', key)[0] >> self._part_shift
part = self.get_part(account, container, obj)
return part, self._get_part_nodes(part)
def get_more_nodes(self, part):

View File

@ -48,10 +48,9 @@ class AccountController(Controller):
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
partition, nodes = self.app.account_ring.get_nodes(self.account_name)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Account'), partition, nodes, req.path_info.rstrip('/'),
len(nodes))
req, _('Account'), self.app.account_ring, partition,
req.path_info.rstrip('/'))
if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
@ -70,8 +69,8 @@ class AccountController(Controller):
self.account_name)
return resp
resp = self.GETorHEAD_base(
req, _('Account'), partition, nodes, req.path_info.rstrip('/'),
len(nodes))
req, _('Account'), self.app.account_ring, partition,
req.path_info.rstrip('/'))
return resp
@public

View File

@ -242,7 +242,7 @@ def get_account_info(env, app, swift_source=None):
cache = cache_from_env(env)
if not cache:
return None
(version, account, container, _) = \
(version, account, _junk, _junk) = \
split_path(env['PATH_INFO'], 2, 4, True)
cache_key = get_account_memcache_key(account)
# Use a unique environment cache key per account. If you copy this env
@ -295,15 +295,6 @@ class Controller(object):
if k.lower() in self.pass_through_headers or
k.lower().startswith(x_meta))
def error_increment(self, node):
"""
Handles incrementing error counts when talking to nodes.
:param node: dictionary of node to increment the error count for
"""
node['errors'] = node.get('errors', 0) + 1
node['last_error'] = time.time()
def error_occurred(self, node, msg):
"""
Handle logging, and handling of errors.
@ -311,10 +302,11 @@ class Controller(object):
:param node: dictionary of node to handle errors for
:param msg: error message
"""
self.error_increment(node)
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'),
node['errors'] = node.get('errors', 0) + 1
node['last_error'] = time.time()
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port']})
'port': node['port'], 'device': node['device']})
def exception_occurred(self, node, typ, additional_info):
"""
@ -352,14 +344,21 @@ class Controller(object):
_('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
return limited
def error_limit(self, node):
def error_limit(self, node, msg):
"""
Mark a node as error limited.
Mark a node as error limited. This immediately pretends the
node received enough errors to trigger error suppression. Use
this for errors like Insufficient Storage. For other errors
use :func:`error_occurred`.
:param node: dictionary of node to error limit
:param msg: error message
"""
node['errors'] = self.app.error_suppression_limit + 1
node['last_error'] = time.time()
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
def account_info(self, account, autocreate=False):
"""
@ -393,16 +392,9 @@ class Controller(object):
elif result_code == HTTP_NOT_FOUND and not autocreate:
return None, None, None
result_code = 0
attempts_left = len(nodes)
path = '/%s' % account
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
iternodes = self.iter_nodes(partition, nodes, self.app.account_ring)
while attempts_left > 0:
try:
node = iternodes.next()
except StopIteration:
break
attempts_left -= 1
for node in self.iter_nodes(self.app.account_ring, partition):
try:
start_node_timing = time.time()
with ConnectionTimeout(self.app.conn_timeout):
@ -412,7 +404,7 @@ class Controller(object):
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
resp.read()
body = resp.read()
if is_success(resp.status):
result_code = HTTP_OK
account_info.update(
@ -424,10 +416,16 @@ class Controller(object):
elif result_code != HTTP_NOT_FOUND:
result_code = -1
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
continue
else:
result_code = -1
if is_server_error(resp.status):
self.error_occurred(
node,
_('ERROR %(status)d %(body)s From Account '
'Server') %
{'status': resp.status, 'body': body[:1024]})
except (Exception, Timeout):
self.exception_occurred(node, _('Account'),
_('Trying to get account info for %s')
@ -497,9 +495,8 @@ class Controller(object):
return container_info
if not self.account_info(account, autocreate=account_autocreate)[1]:
return container_info
attempts_left = len(nodes)
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
for node in self.iter_nodes(part, nodes, self.app.container_ring):
for node in self.iter_nodes(self.app.container_ring, part):
try:
start_node_timing = time.time()
with ConnectionTimeout(self.app.conn_timeout):
@ -509,7 +506,7 @@ class Controller(object):
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
resp.read()
body = resp.read()
if is_success(resp.status):
container_info.update(
headers_to_container_info(resp.getheaders()))
@ -519,14 +516,16 @@ class Controller(object):
else:
container_info['status'] = -1
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(resp.status):
self.error_occurred(node, _(
'ERROR %(status)d %(body)s From Container '
'Server') %
{'status': resp.status, 'body': body[:1024]})
except (Exception, Timeout):
self.exception_occurred(
node, _('Container'),
_('Trying to get container info for %s') % path)
attempts_left -= 1
if attempts_left <= 0:
break
if self.app.memcache:
if container_info['status'] == HTTP_OK:
self.app.memcache.set(
@ -541,18 +540,25 @@ class Controller(object):
container_info['nodes'] = nodes
return container_info
def iter_nodes(self, partition, nodes, ring):
def iter_nodes(self, ring, partition):
"""
Node iterator that will first iterate over the normal nodes for a
partition and then the handoff partitions for the node.
Yields nodes for a ring partition, skipping over error
limited nodes and stopping at the configurable number of
nodes. If a node yielded subsequently gets error limited, an
extra node will be yielded to take its place.
:param partition: partition to iterate nodes for
:param nodes: list of node dicts from the ring
:param ring: ring to get handoff nodes from
:param ring: ring to get yield nodes from
:param partition: ring partition to yield nodes for
"""
for node in nodes:
primary_nodes = self.app.sort_nodes(ring.get_part_nodes(partition))
nodes_left = self.app.request_node_count(ring)
for node in primary_nodes:
if not self.error_limited(node):
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
handoffs = 0
for node in ring.get_more_nodes(partition):
if not self.error_limited(node):
@ -561,9 +567,13 @@ class Controller(object):
self.app.logger.increment('handoff_count')
self.app.logger.warning(
'Handoff requested (%d)' % handoffs)
if handoffs == len(nodes):
if handoffs == len(primary_nodes):
self.app.logger.increment('handoff_all_count')
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
def _make_request(self, nodes, part, method, path, headers, query,
logger_thread_locals):
@ -583,7 +593,7 @@ class Controller(object):
not is_server_error(resp.status):
return resp.status, resp.reason, resp.read()
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
except (Exception, Timeout):
self.exception_occurred(node, self.server_type,
_('Trying to %(method)s %(path)s') %
@ -601,7 +611,7 @@ class Controller(object):
:returns: a swob.Response object
"""
start_nodes = ring.get_part_nodes(part)
nodes = self.iter_nodes(part, start_nodes, ring)
nodes = self.iter_nodes(ring, part)
pile = GreenPile(len(start_nodes))
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,
@ -755,17 +765,15 @@ class Controller(object):
"""
return is_success(src.status) or is_redirection(src.status)
def GETorHEAD_base(self, req, server_type, partition, nodes, path,
attempts):
def GETorHEAD_base(self, req, server_type, ring, partition, path):
"""
Base handler for HTTP GET or HEAD requests.
:param req: swob.Request object
:param server_type: server type
:param ring: the ring to obtain nodes from
:param partition: partition
:param nodes: nodes
:param path: path for the request
:param attempts: number of attempts to try
:returns: swob.Response object
"""
statuses = []
@ -773,14 +781,7 @@ class Controller(object):
bodies = []
sources = []
newest = config_true_value(req.headers.get('x-newest', 'f'))
nodes = iter(nodes)
while len(statuses) < attempts:
try:
node = nodes.next()
except StopIteration:
break
if self.error_limited(node):
continue
for node in self.iter_nodes(ring, partition):
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
@ -811,7 +812,7 @@ class Controller(object):
statuses.append(possible_source.status)
reasons.append(possible_source.reason)
bodies.append('')
sources.append(possible_source)
sources.append((possible_source, node))
if not newest: # one good source is enough
break
else:
@ -819,7 +820,7 @@ class Controller(object):
reasons.append(possible_source.reason)
bodies.append(possible_source.read())
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status):
self.error_occurred(node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') %
@ -827,9 +828,9 @@ class Controller(object):
'body': bodies[-1][:1024],
'type': server_type})
if sources:
sources.sort(key=source_key)
source = sources.pop()
for src in sources:
sources.sort(key=lambda s: source_key(s[0]))
source, node = sources.pop()
for src, _junk in sources:
self.close_swift_conn(src)
res = Response(request=req, conditional_response=True)
if req.method == 'GET' and \

View File

@ -66,11 +66,10 @@ class ContainerController(Controller):
"""Handler for HTTP GET/HEAD requests."""
if not self.account_info(self.account_name)[1]:
return HTTPNotFound(request=req)
part, nodes = self.app.container_ring.get_nodes(
part = self.app.container_ring.get_part(
self.account_name, self.container_name)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Container'), part, nodes, req.path_info, len(nodes))
req, _('Container'), self.app.container_ring, part, req.path_info)
if self.app.memcache:
# set the memcache container size for ratelimiting
cache_key = get_container_memcache_key(self.account_name,

View File

@ -140,7 +140,7 @@ class SegmentedIterable(object):
self.segment_dict['name'].lstrip('/').split('/', 1)
else:
container, obj = self.container, self.segment_dict['name']
partition, nodes = self.controller.app.object_ring.get_nodes(
partition = self.controller.app.object_ring.get_part(
self.controller.account_name, container, obj)
path = '/%s/%s/%s' % (self.controller.account_name, container, obj)
req = Request.blank(path)
@ -152,12 +152,9 @@ class SegmentedIterable(object):
sleep(max(self.next_get_time - time.time(), 0))
self.next_get_time = time.time() + \
1.0 / self.controller.app.rate_limit_segments_per_sec
nodes = self.controller.app.sort_nodes(nodes)
resp = self.controller.GETorHEAD_base(
req, _('Object'), partition,
self.controller.iter_nodes(partition, nodes,
self.controller.app.object_ring),
path, len(nodes))
req, _('Object'), self.controller.app.object_ring, partition,
path)
if self.is_slo and resp.status_int == HTTP_NOT_FOUND:
raise SloSegmentError(_(
'Could not load object segment %(path)s:'
@ -309,7 +306,7 @@ class ObjectController(Controller):
yield item
def _listing_pages_iter(self, lcontainer, lprefix, env):
lpartition, lnodes = self.app.container_ring.get_nodes(
lpartition = self.app.container_ring.get_part(
self.account_name, lcontainer)
marker = ''
while True:
@ -321,10 +318,9 @@ class ObjectController(Controller):
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
lnodes = self.app.sort_nodes(lnodes)
lresp = self.GETorHEAD_base(
lreq, _('Container'), lpartition, lnodes, lreq.path_info,
len(lnodes))
lreq, _('Container'), self.app.container_ring, lpartition,
lreq.path_info)
if 'swift.authorize' in env:
lreq.acl = lresp.headers.get('x-container-read')
aresp = env['swift.authorize'](lreq)
@ -385,13 +381,10 @@ class ObjectController(Controller):
if aresp:
return aresp
partition, nodes = self.app.object_ring.get_nodes(
partition = self.app.object_ring.get_part(
self.account_name, self.container_name, self.object_name)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, len(nodes))
req, _('Object'), self.app.object_ring, partition, req.path_info)
if ';' in resp.headers.get('content-type', ''):
# strip off swift_bytes from content-type
@ -424,11 +417,9 @@ class ObjectController(Controller):
new_req = req.copy_get()
new_req.method = 'GET'
new_req.range = None
nodes = self.app.sort_nodes(nodes)
new_resp = self.GETorHEAD_base(
new_req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, len(nodes))
new_req, _('Object'), self.app.object_ring, partition,
req.path_info)
if new_resp.status_int // 100 == 2:
try:
listing = json.loads(new_resp.body)
@ -685,7 +676,7 @@ class ObjectController(Controller):
conn.node = node
return conn
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node)
self.error_limit(node, _('ERROR Insufficient Storage'))
except:
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % path)
@ -744,8 +735,9 @@ class ObjectController(Controller):
req.environ.get('swift_versioned_copy')):
hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'},
environ={'REQUEST_METHOD': 'HEAD'})
hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
hreq.path_info, len(nodes))
hresp = self.GETorHEAD_base(
hreq, _('Object'), self.app.object_ring, partition,
hreq.path_info)
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
@ -867,7 +859,7 @@ class ObjectController(Controller):
source_resp.headers['X-Static-Large-Object']
req = new_req
node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
node_iter = self.iter_nodes(self.app.object_ring, partition)
pile = GreenPile(len(nodes))
chunked = req.headers.get('transfer-encoding')

View File

@ -115,6 +115,16 @@ class Application(object):
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
self.allow_static_large_object = config_true_value(
conf.get('allow_static_large_object', 'true'))
value = conf.get('request_node_count', '2 * replicas').lower().split()
if len(value) == 1:
value = int(value[0])
self.request_node_count = lambda r: value
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
value = int(value[0])
self.request_node_count = lambda r: value * r.replica_count
else:
raise ValueError(
'Invalid request_node_count value: %r' % ''.join(value))
def get_controller(self, path):
"""

View File

@ -201,6 +201,13 @@ class TestRing(unittest.TestCase):
self.assertEquals(len(self.ring.devs), 9)
self.assertNotEquals(self.ring._mtime, orig_mtime)
def test_get_part(self):
part1 = self.ring.get_part('a')
nodes1 = self.ring.get_part_nodes(part1)
part2, nodes2 = self.ring.get_nodes('a')
self.assertEquals(part1, part2)
self.assertEquals(nodes1, nodes2)
def test_get_part_nodes(self):
part, nodes = self.ring.get_nodes('a')
self.assertEquals(nodes, self.ring.get_part_nodes(part))

View File

@ -21,7 +21,7 @@ import sys
import unittest
import urlparse
import signal
from contextlib import contextmanager
from contextlib import contextmanager, nested
from gzip import GzipFile
from shutil import rmtree
import time
@ -30,6 +30,7 @@ from hashlib import md5
from tempfile import mkdtemp
import random
import mock
from eventlet import sleep, spawn, wsgi, listen
import simplejson
@ -204,6 +205,13 @@ class FakeRing(object):
self.replicas = replicas
self.devs = {}
@property
def replica_count(self):
return self.replicas
def get_part(self, account, container=None, obj=None):
return 1
def get_nodes(self, account, container=None, obj=None):
devs = []
for x in xrange(self.replicas):
@ -1872,12 +1880,13 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.app.object_ring.max_more_nodes = 20
self.app.request_node_count = lambda r: 20
controller = proxy_server.ObjectController(self.app, 'account',
'container',
'object')
@ -1885,8 +1894,8 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 9)
@ -1900,8 +1909,8 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(
@ -1919,14 +1928,49 @@ class TestObjectController(unittest.TestCase):
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(partition, nodes,
self.app.object_ring):
for node in controller.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(self.app.logger.log_dict['warning'], [])
finally:
self.app.object_ring.max_more_nodes = 0
def test_iter_nodes_calls_sort_nodes(self):
with mock.patch.object(self.app, 'sort_nodes') as sort_nodes:
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
for node in controller.iter_nodes(self.app.object_ring, 0):
pass
sort_nodes.assert_called_once_with(
self.app.object_ring.get_part_nodes(0))
def test_iter_nodes_skips_error_limited(self):
with mock.patch.object(self.app, 'sort_nodes', lambda n: n):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
self.assertTrue(first_nodes[0] in second_nodes)
controller.error_limit(first_nodes[0], 'test')
second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
self.assertTrue(first_nodes[0] not in second_nodes)
def test_iter_nodes_gives_extra_if_error_limited_inline(self):
with nested(
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
mock.patch.object(self.app, 'request_node_count',
lambda r: 6),
mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
second_nodes = []
for node in controller.iter_nodes(self.app.object_ring, 0):
if not second_nodes:
controller.error_limit(node, 'test')
second_nodes.append(node)
self.assertEquals(len(first_nodes), 6)
self.assertEquals(len(second_nodes), 7)
def test_best_response_sets_etag(self):
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
@ -5425,8 +5469,8 @@ class FakeObjectController(object):
resp = Response(app_iter=iter(body))
return resp
def iter_nodes(self, partition, nodes, ring):
for node in nodes:
def iter_nodes(self, ring, partition):
for node in ring.get_part_nodes(partition):
yield node
for node in ring.get_more_nodes(partition):
yield node