diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 3f5ddcec09..164ddf1071 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -87,6 +87,13 @@ use = egg:swift#proxy # Once segment rate-limiting kicks in for an object, limit segments served # to N per second. # rate_limit_segments_per_sec = 1 +# Storage nodes can be chosen at random (shuffle) or by using timing +# measurements. Using timing measurements may allow for lower overall latency. +# The valid values for sorting_method are "shuffle" and "timing" +# sorting_method = shuffle +# If the timing sorting_method is used, the timings will only be valid for +# the number of seconds configured by timing_expiry. +# timing_expiry = 300 [filter:tempauth] use = egg:swift#tempauth diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py index 4237c5cfe8..adb23733ff 100644 --- a/swift/proxy/controllers/account.py +++ b/swift/proxy/controllers/account.py @@ -26,7 +26,6 @@ import time from urllib import unquote -from random import shuffle from swift.common.utils import normalize_timestamp, public from swift.common.constraints import check_metadata, MAX_ACCOUNT_NAME_LENGTH @@ -49,7 +48,7 @@ class AccountController(Controller): def GETorHEAD(self, req): """Handler for HTTP GET/HEAD requests.""" partition, nodes = self.app.account_ring.get_nodes(self.account_name) - shuffle(nodes) + nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( req, _('Account'), partition, nodes, req.path_info.rstrip('/'), len(nodes)) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index a867182ede..b0ddf1c2ad 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -400,10 +400,12 @@ class Controller(object): break attempts_left -= 1 try: + start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], partition, 'HEAD', path, headers) + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() resp.read() @@ -491,10 +493,12 @@ class Controller(object): headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} for node in self.iter_nodes(part, nodes, self.app.container_ring): try: + start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], part, 'HEAD', path, headers) + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() resp.read() @@ -558,11 +562,13 @@ class Controller(object): self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: + start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], node['device'], part, method, path, headers=headers, query_string=query) conn.node = node + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): resp = conn.getresponse() if not is_informational(resp.status) and \ @@ -767,6 +773,7 @@ class Controller(object): break if self.error_limited(node): continue + start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): headers = dict(req.headers) @@ -775,6 +782,7 @@ class Controller(object): node['ip'], node['port'], node['device'], partition, req.method, path, headers=headers, query_string=req.query_string) + self.app.set_node_timing(node, time.time() - start_node_timing) with Timeout(self.app.node_timeout): possible_source = conn.getresponse() # See NOTE: swift_conn at top of file about this. diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 90a8861d56..61d1aa1198 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -26,7 +26,6 @@ import time from urllib import unquote -from random import shuffle from swift.common.utils import normalize_timestamp, public, csv_append from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH @@ -69,7 +68,7 @@ class ContainerController(Controller): return HTTPNotFound(request=req) part, nodes = self.app.container_ring.get_nodes( self.account_name, self.container_name) - shuffle(nodes) + nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( req, _('Container'), part, nodes, req.path_info, len(nodes)) if self.app.memcache: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 30225c93f9..53188e65ef 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -31,7 +31,6 @@ import time from datetime import datetime from urllib import unquote, quote from hashlib import md5 -from random import shuffle from eventlet import sleep, GreenPile from eventlet.queue import Queue @@ -123,7 +122,7 @@ class SegmentedIterable(object): sleep(max(self.next_get_time - time.time(), 0)) self.next_get_time = time.time() + \ 1.0 / self.controller.app.rate_limit_segments_per_sec - shuffle(nodes) + nodes = self.controller.app.sort_nodes(nodes) resp = self.controller.GETorHEAD_base( req, _('Object'), partition, self.controller.iter_nodes(partition, nodes, @@ -271,7 +270,7 @@ class ObjectController(Controller): lreq.environ['QUERY_STRING'] = \ 'format=json&prefix=%s&marker=%s' % (quote(lprefix), quote(marker)) - shuffle(lnodes) + nodes = self.app.sort_nodes(lnodes) lresp = self.GETorHEAD_base( lreq, _('Container'), lpartition, lnodes, lreq.path_info, len(lnodes)) @@ -337,7 +336,7 @@ class ObjectController(Controller): partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - shuffle(nodes) + nodes = self.app.sort_nodes(nodes) resp = self.GETorHEAD_base( req, _('Object'), partition, self.iter_nodes(partition, nodes, self.app.object_ring), @@ -558,10 +557,12 @@ class ObjectController(Controller): self.app.logger.thread_locals = logger_thread_locals for node in nodes: try: + start_time = time.time() with ConnectionTimeout(self.app.conn_timeout): conn = http_connect( node['ip'], node['port'], node['device'], part, 'PUT', path, headers) + self.app.set_node_timing(node, time.time() - start_time) with Timeout(self.app.node_timeout): resp = conn.getexpect() if resp.status == HTTP_CONTINUE: diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 40234276d0..e73c91b63f 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -28,6 +28,8 @@ import mimetypes import os from ConfigParser import ConfigParser import uuid +from random import shuffle +from time import time from eventlet import Timeout @@ -108,6 +110,9 @@ class Application(object): a.strip() for a in conf.get('cors_allow_origin', '').split(',') if a.strip()] + self.node_timings = {} + self.timing_expiry = int(conf.get('timing_expiry', 300)) + self.sorting_method = conf.get('sorting_method', 'shuffle').lower() def get_controller(self, path): """ @@ -242,6 +247,33 @@ class Application(object): self.logger.exception(_('ERROR Unhandled exception in request')) return HTTPServerError(request=req) + def sort_nodes(self, nodes): + ''' + Sorts nodes in-place (and returns the sorted list) according to + the configured strategy. The default "sorting" is to randomly + shuffle the nodes. If the "timing" strategy is chosen, the nodes + are sorted according to the stored timing data. + ''' + # In the case of timing sorting, shuffling ensures that close timings + # (ie within the rounding resolution) won't prefer one over another. + # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/) + shuffle(nodes) + if self.sorting_method == 'timing': + now = time() + + def key_func(node): + timing, expires = self.node_timings.get(node['ip'], (-1.0, 0)) + return timing if expires > now else -1.0 + nodes.sort(key=key_func) + return nodes + + def set_node_timing(self, node, timing): + if self.sorting_method != 'timing': + return + now = time() + timing = round(timing, 3) # sort timings to the millisecond + self.node_timings[node['ip']] = (timing, now + self.timing_expiry) + def app_factory(global_conf, **local_conf): """paste.deploy app factory for creating WSGI proxy apps.""" diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index fcbffedaa1..87fcdbc0d5 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -32,6 +32,7 @@ import time from urllib import unquote, quote from hashlib import md5 from tempfile import mkdtemp +import random import eventlet from eventlet import sleep, spawn, Timeout, util, wsgi, listen @@ -334,7 +335,8 @@ class FakeRing(object): self.devs[x] = devs[x] = \ {'ip': '10.0.0.%s' % x, 'port': 1000 + x, - 'device': 'sd' + (chr(ord('a') + x))} + 'device': 'sd' + (chr(ord('a') + x)), + 'id': x} return 1, devs def get_part_nodes(self, part): @@ -410,15 +412,6 @@ def set_http_connect(*args, **kwargs): swift.proxy.controllers.container.http_connect = new_connect -def set_shuffle(): - shuffle = lambda l: None - proxy_server.shuffle = shuffle - swift.proxy.controllers.base.shuffle = shuffle - swift.proxy.controllers.obj.shuffle = shuffle - swift.proxy.controllers.account.shuffle = shuffle - swift.proxy.controllers.container.shuffle = shuffle - - # tests class TestController(unittest.TestCase): @@ -763,6 +756,41 @@ class TestProxyServer(unittest.TestCase): finally: rmtree(swift_dir, ignore_errors=True) + def test_node_timing(self): + baseapp = proxy_server.Application({'sorting_method': 'timing'}, + FakeMemcache(), + container_ring=FakeRing(), + object_ring=FakeRing(), + account_ring=FakeRing()) + self.assertEquals(baseapp.node_timings, {}) + + req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'HEAD'}) + baseapp.update_request(req) + resp = baseapp.handle_request(req) + self.assertEquals(resp.status_int, 503) # couldn't connect to anything + exp_timings = {} + self.assertEquals(baseapp.node_timings, exp_timings) + + proxy_server.time = lambda: times.pop(0) + try: + times = [time.time()] + exp_timings = {'127.0.0.1': (0.1, + times[0] + baseapp.timing_expiry)} + baseapp.set_node_timing({'ip': '127.0.0.1'}, 0.1) + self.assertEquals(baseapp.node_timings, exp_timings) + finally: + proxy_server.time = time.time + + proxy_server.shuffle = lambda l: l + try: + nodes = [{'ip': '127.0.0.1'}, {'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}] + res = baseapp.sort_nodes(nodes) + exp_sorting = [{'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}, + {'ip': '127.0.0.1'}] + self.assertEquals(res, exp_sorting) + finally: + proxy_server.shuffle = random.shuffle + class TestObjectController(unittest.TestCase): @@ -1759,9 +1787,9 @@ class TestObjectController(unittest.TestCase): def test_error_limiting(self): with save_globals(): - set_shuffle() controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') + controller.app.sort_nodes = lambda l: l self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), 200) self.assertEquals(controller.app.object_ring.devs[0]['errors'], 2) @@ -4224,9 +4252,9 @@ class TestContainerController(unittest.TestCase): def test_error_limiting(self): with save_globals(): - set_shuffle() controller = proxy_server.ContainerController(self.app, 'account', 'container') + controller.app.sort_nodes = lambda l: l self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200, missing_container=False) self.assertEquals( @@ -5222,6 +5250,12 @@ class FakeObjectController(object): for node in ring.get_more_nodes(partition): yield node + def sort_nodes(self, nodes): + return nodes + + def set_node_timing(self, node, timing): + return + class Stub(object): pass