From 69917347cf87a1f50ef65338b99fc4a3a1ccfee7 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Thu, 7 Feb 2013 22:07:18 -0800 Subject: [PATCH] timing-based affinity sorting for primary replicas This changes the way primary replicas can be sorted on GET requests. Previously, replicas were shuffled. Now, if configured, the replicas are sorted based on the most recent connection time data to that node. This patch adds a config value that changes the sorting method. get_more_nodes() (ie handoffs) is unaffected by this patch because sorting by affinity would break the durability provided by the current as-unique-as-possible handoff selection. Timing data is collected for each node each time the proxy makes a connection to that node (IP address). If timing data for a node doesn't exist, then it is assumed at -1 (ie will sort earlier) so that timing data can be collected for that node. Change-Id: I837fa21c3a566b10cce33eb75788665e1d01cd8a --- etc/proxy-server.conf-sample | 7 ++++ swift/proxy/controllers/account.py | 3 +- swift/proxy/controllers/base.py | 8 ++++ swift/proxy/controllers/container.py | 3 +- swift/proxy/controllers/obj.py | 9 +++-- swift/proxy/server.py | 32 +++++++++++++++ test/unit/proxy/test_server.py | 58 ++++++++++++++++++++++------ 7 files changed, 100 insertions(+), 20 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 3f5ddcec09..164ddf1071 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -87,6 +87,13 @@ use = egg:swift#proxy # Once segment rate-limiting kicks in for an object, limit segments served # to N per second. # rate_limit_segments_per_sec = 1 +# Storage nodes can be chosen at random (shuffle) or by using timing +# measurements. Using timing measurements may allow for lower overall latency. +# The valid values for sorting_method are "shuffle" and "timing" +# sorting_method = shuffle +# If the timing sorting_method is used, the timings will only be valid for +# the number of seconds configured by timing_expiry. +# timing_expiry = 300 [filter:tempauth] use = egg:swift#tempauth diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index 4237c5cfe8..adb23733ff 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -26,7 +26,6 @@ import time from urllib import unquote -from random import shuffle from swift.common.utils import normalize_timestamp, public from swift.common.constraints import check_metadata, MAX_ACCOUNT_NAME_LENGTH @@ -49,7 +48,7 @@ class AccountController(Controller): def GETorHEAD(self, req): """Handler for HTTP GET/HEAD requests.""" partition, nodes = self.app.account_ring.get_nodes(self.account_name) - shuffle(nodes) + nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( req, _('Account'), partition, nodes, req.path_info.rstrip('/'), len(nodes)) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index f4ed3f5af6..7ef1d580e4 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -353,10 +353,12 @@ class Controller(object): break attempts_left -= 1 try: + start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], partition, 'HEAD', path, headers) + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() resp.read() @@ -444,10 +446,12 @@ class Controller(object): headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} for node in self.iter_nodes(part, nodes, self.app.container_ring): try: + start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], part, 'HEAD', path, headers) + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() resp.read() @@ -511,11 +515,13 @@ class Controller(object): self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: + start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], part, method, path, headers=headers, query_string=query) conn.node = node + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() if not is_informational(resp.status) and \ @@ -720,6 +726,7 @@ class Controller(object): break if self.error_limited(node): continue + start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): headers = dict(req.headers) @@ -728,6 +735,7 @@ class Controller(object): node['ip'], node['port'], node['device'], partition, req.method, path, headers=headers, query_string=req.query_string) + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): possible_source = conn.getresponse() # See NOTE: swift_conn at top of file about this. diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 90a8861d56..61d1aa1198 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -26,7 +26,6 @@ import time from urllib import unquote -from random import shuffle from swift.common.utils import normalize_timestamp, public, csv_append from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH @@ -69,7 +68,7 @@ class ContainerController(Controller): return HTTPNotFound(request=req) part, nodes = self.app.container_ring.get_nodes( self.account_name, self.container_name) - shuffle(nodes) + nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( req, _('Container'), part, nodes, req.path_info, len(nodes)) if self.app.memcache: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 30225c93f9..53188e65ef 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -31,7 +31,6 @@ import time from datetime import datetime from urllib import unquote, quote from hashlib import md5 -from random import shuffle from eventlet import sleep, GreenPile from eventlet.queue import Queue @@ -123,7 +122,7 @@ class SegmentedIterable(object): sleep(max(self.next_get_time - time.time(), 0)) self.next_get_time = time.time() + \ 1.0 / self.controller.app.rate_limit_segments_per_sec - shuffle(nodes) + nodes = self.controller.app.sort_nodes(nodes) resp = self.controller.GETorHEAD_base( req, _('Object'), partition, self.controller.iter_nodes(partition, nodes, @@ -271,7 +270,7 @@ class ObjectController(Controller): lreq.environ['QUERY_STRING'] = \ 'format=json&prefix=%s&marker=%s' % (quote(lprefix), quote(marker)) - shuffle(lnodes) + nodes = self.app.sort_nodes(lnodes) lresp = self.GETorHEAD_base( lreq, _('Container'), lpartition, lnodes, lreq.path_info, len(lnodes)) @@ -337,7 +336,7 @@ class ObjectController(Controller): partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - shuffle(nodes) + nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( req, _('Object'), partition, self.iter_nodes(partition, nodes, self.app.object_ring), @@ -558,10 +557,12 @@ class ObjectController(Controller): self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: + start_time = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect( node['ip'], node['port'], node['device'], part, 'PUT', path, headers) + self.app.set_node_timing(node, time.time() - start_time) with Timeout(self.app.node_timeout): resp = conn.getexpect() if resp.status == HTTP_CONTINUE: diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 40234276d0..e73c91b63f 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -28,6 +28,8 @@ import mimetypes import os from ConfigParser import ConfigParser import uuid +from random import shuffle +from time import time from eventlet import Timeout @@ -108,6 +110,9 @@ class Application(object): a.strip() for a in conf.get('cors_allow_origin', '').split(',') if a.strip()] + self.node_timings = {} + self.timing_expiry = int(conf.get('timing_expiry', 300)) + self.sorting_method = conf.get('sorting_method', 'shuffle').lower() def get_controller(self, path): """ @@ -242,6 +247,33 @@ class Application(object): self.logger.exception(_('ERROR Unhandled exception in request')) return HTTPServerError(request=req) + def sort_nodes(self, nodes): + ''' + Sorts nodes in-place (and returns the sorted list) according to + the configured strategy. The default "sorting" is to randomly + shuffle the nodes. If the "timing" strategy is chosen, the nodes + are sorted according to the stored timing data. + ''' + # In the case of timing sorting, shuffling ensures that close timings + # (ie within the rounding resolution) won't prefer one over another. + # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/) + shuffle(nodes) + if self.sorting_method == 'timing': + now = time() + + def key_func(node): + timing, expires = self.node_timings.get(node['ip'], (-1.0, 0)) + return timing if expires > now else -1.0 + nodes.sort(key=key_func) + return nodes + + def set_node_timing(self, node, timing): + if self.sorting_method != 'timing': + return + now = time() + timing = round(timing, 3) # sort timings to the millisecond + self.node_timings[node['ip']] = (timing, now + self.timing_expiry) + def app_factory(global_conf, **local_conf): """paste.deploy app factory for creating WSGI proxy apps.""" diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index f720c4ae74..aede230037 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -32,6 +32,7 @@ import time from urllib import unquote, quote from hashlib import md5 from tempfile import mkdtemp +import random import eventlet from eventlet import sleep, spawn, Timeout, util, wsgi, listen @@ -334,7 +335,8 @@ class FakeRing(object): self.devs[x] = devs[x] = \ {'ip': '10.0.0.%s' % x, 'port': 1000 + x, - 'device': 'sd' + (chr(ord('a') + x))} + 'device': 'sd' + (chr(ord('a') + x)), + 'id': x} return 1, devs def get_part_nodes(self, part): @@ -410,15 +412,6 @@ def set_http_connect(*args, **kwargs): swift.proxy.controllers.container.http_connect = new_connect -def set_shuffle(): - shuffle = lambda l: None - proxy_server.shuffle = shuffle - swift.proxy.controllers.base.shuffle = shuffle - swift.proxy.controllers.obj.shuffle = shuffle - swift.proxy.controllers.account.shuffle = shuffle - swift.proxy.controllers.container.shuffle = shuffle - - # tests class TestController(unittest.TestCase): @@ -753,6 +746,41 @@ class TestProxyServer(unittest.TestCase): finally: rmtree(swift_dir, ignore_errors=True) + def test_node_timing(self): + baseapp = proxy_server.Application({'sorting_method': 'timing'}, + FakeMemcache(), + container_ring=FakeRing(), + object_ring=FakeRing(), + account_ring=FakeRing()) + self.assertEquals(baseapp.node_timings, {}) + + req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'HEAD'}) + baseapp.update_request(req) + resp = baseapp.handle_request(req) + self.assertEquals(resp.status_int, 503) # couldn't connect to anything + exp_timings = {} + self.assertEquals(baseapp.node_timings, exp_timings) + + proxy_server.time = lambda: times.pop(0) + try: + times = [time.time()] + exp_timings = {'127.0.0.1': (0.1, + times[0] + baseapp.timing_expiry)} + baseapp.set_node_timing({'ip': '127.0.0.1'}, 0.1) + self.assertEquals(baseapp.node_timings, exp_timings) + finally: + proxy_server.time = time.time + + proxy_server.shuffle = lambda l: l + try: + nodes = [{'ip': '127.0.0.1'}, {'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}] + res = baseapp.sort_nodes(nodes) + exp_sorting = [{'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}, + {'ip': '127.0.0.1'}] + self.assertEquals(res, exp_sorting) + finally: + proxy_server.shuffle = random.shuffle + class TestObjectController(unittest.TestCase): @@ -1749,9 +1777,9 @@ class TestObjectController(unittest.TestCase): def test_error_limiting(self): with save_globals(): - set_shuffle() controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') + controller.app.sort_nodes = lambda l: l self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), 200) self.assertEquals(controller.app.object_ring.devs[0]['errors'], 2) @@ -4214,9 +4242,9 @@ class TestContainerController(unittest.TestCase): def test_error_limiting(self): with save_globals(): - set_shuffle() controller = proxy_server.ContainerController(self.app, 'account', 'container') + controller.app.sort_nodes = lambda l: l self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200, missing_container=False) self.assertEquals( @@ -5212,6 +5240,12 @@ class FakeObjectController(object): for node in ring.get_more_nodes(partition): yield node + def sort_nodes(self, nodes): + return nodes + + def set_node_timing(self, node, timing): + return + class Stub(object): pass