diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index bfbee3734a..2f5d2b9fc4 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -115,12 +115,16 @@ use = egg:swift#proxy # to N per second. # rate_limit_segments_per_sec = 1 # -# Storage nodes can be chosen at random (shuffle) or by using timing -# measurements. Using timing measurements may allow for lower overall latency. -# The valid values for sorting_method are "shuffle" and "timing" +# Storage nodes can be chosen at random (shuffle), by using timing +# measurements (timing), or by using an explicit match (affinity). +# Using timing measurements may allow for lower overall latency, while +# using affinity allows for finer control. In both the timing and +# affinity cases, equally-sorting nodes are still randomly chosen to +# spread load. +# The valid values for sorting_method are "affinity", "shuffle", and "timing". # sorting_method = shuffle # -# If the timing sorting_method is used, the timings will only be valid for +# If the "timing" sorting_method is used, the timings will only be valid for # the number of seconds configured by timing_expiry. # timing_expiry = 300 # @@ -133,6 +137,16 @@ use = egg:swift#proxy # '* replicas' at the end to have it use the number given times the number of # replicas for the ring being used for the request. # request_node_count = 2 * replicas +# +# Which backend servers to prefer on reads. Format is r for region +# N or rz for region N, zone M. The value after the equals is +# the priority; lower numbers are higher priority. +# +# Example: first read from region 1 zone 1, then region 1 zone 2, then +# anything in region 2, then everything else: +# read_affinity = r1z1=100, r1z2=200, r2=300 +# Default is empty, meaning no preference. +# read_affinity = [filter:tempauth] use = egg:swift#tempauth diff --git a/swift/common/utils.py b/swift/common/utils.py index 6c012b80b8..5d168ce9d4 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -17,8 +17,10 @@ import errno import fcntl +import operator import os import pwd +import re import sys import time import uuid @@ -1523,6 +1525,63 @@ def validate_sync_to(value, allowed_sync_hosts): return None +def affinity_key_function(affinity_str): + """Turns an affinity config value into a function suitable for passing to + sort(). After doing so, the array will be sorted with respect to the given + ordering. + + For example, if affinity_str is "r1=1, r2z7=2, r2z8=2", then the array + will be sorted with all nodes from region 1 (r1=1) first, then all the + nodes from region 2 zones 7 and 8 (r2z7=2 and r2z8=2), then everything + else. + + Note that the order of the pieces of affinity_str is irrelevant; the + priority values are what comes after the equals sign. + + If affinity_str is empty or all whitespace, then the resulting function + will not alter the ordering of the nodes. However, if affinity_str + contains an invalid value, then None is returned. + + :param affinity_str: affinity config value, e.g. "r1z2=3" + or "r1=1, r2z1=2, r2z2=2" + :returns: single-argument function, or None if argument invalid + + """ + affinity_str = affinity_str.strip() + + if not affinity_str: + return lambda x: 0 + + priority_matchers = [] + pieces = [s.strip() for s in affinity_str.split(',')] + for piece in pieces: + # matches r= or rz= + match = re.match("r(\d+)(?:z(\d+))?=(\d+)$", piece) + if match: + region, zone, priority = match.groups() + region = int(region) + priority = int(priority) + zone = int(zone) if zone else None + + matcher = {'region': region, 'priority': priority} + if zone is not None: + matcher['zone'] = zone + priority_matchers.append(matcher) + else: + raise ValueError("Invalid affinity value: %r" % affinity_str) + + priority_matchers.sort(key=operator.itemgetter('priority')) + + def keyfn(ring_node): + for matcher in priority_matchers: + if (matcher['region'] == ring_node['region'] + and ('zone' not in matcher + or matcher['zone'] == ring_node['zone'])): + return matcher['priority'] + return 4294967296 # 2^32, i.e. "a big number" + return keyfn + + def get_remote_client(req): # remote host for zeus client = req.headers.get('x-cluster-client-ip') diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 65dd1e3f12..6234023e86 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -34,7 +34,8 @@ from eventlet import Timeout from swift.common.ring import Ring from swift.common.utils import cache_from_env, get_logger, \ - get_remote_client, split_path, config_true_value, generate_trans_id + get_remote_client, split_path, config_true_value, generate_trans_id, \ + affinity_key_function from swift.common.constraints import check_utf8 from swift.proxy.controllers import AccountController, ObjectController, \ ContainerController @@ -125,6 +126,13 @@ class Application(object): else: raise ValueError( 'Invalid request_node_count value: %r' % ''.join(value)) + try: + read_affinity = conf.get('read_affinity', '') + self.read_affinity_sort_key = affinity_key_function(read_affinity) + except ValueError as err: + # make the message a little more useful + raise ValueError("Invalid read_affinity value: %r (%s)" % + (read_affinity, err.message)) def get_controller(self, path): """ @@ -277,6 +285,8 @@ class Application(object): timing, expires = self.node_timings.get(node['ip'], (-1.0, 0)) return timing if expires > now else -1.0 nodes.sort(key=key_func) + elif self.sorting_method == 'affinity': + nodes.sort(key=self.read_affinity_sort_key) return nodes def set_node_timing(self, node, timing): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 90de9479d0..80d661f655 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1494,6 +1494,57 @@ class UnsafeXrange(object): self.concurrent_calls -= 1 +class TestAffinityKeyFunction(unittest.TestCase): + def setUp(self): + self.nodes = [dict(id=0, region=1, zone=1), + dict(id=1, region=1, zone=2), + dict(id=2, region=2, zone=1), + dict(id=3, region=2, zone=2), + dict(id=4, region=3, zone=1), + dict(id=5, region=3, zone=2), + dict(id=6, region=4, zone=0), + dict(id=7, region=4, zone=1)] + + def test_single_region(self): + keyfn = utils.affinity_key_function("r3=1") + ids = [n['id'] for n in sorted(self.nodes, key=keyfn)] + self.assertEqual([4, 5, 0, 1, 2, 3, 6, 7], ids) + + def test_bogus_value(self): + self.assertRaises(ValueError, + utils.affinity_key_function, "r3") + self.assertRaises(ValueError, + utils.affinity_key_function, "r3=elephant") + + def test_empty_value(self): + # Empty's okay, it just means no preference + keyfn = utils.affinity_key_function("") + self.assert_(callable(keyfn)) + ids = [n['id'] for n in sorted(self.nodes, key=keyfn)] + self.assertEqual([0, 1, 2, 3, 4, 5, 6, 7], ids) + + def test_all_whitespace_value(self): + # Empty's okay, it just means no preference + keyfn = utils.affinity_key_function(" \n") + self.assert_(callable(keyfn)) + ids = [n['id'] for n in sorted(self.nodes, key=keyfn)] + self.assertEqual([0, 1, 2, 3, 4, 5, 6, 7], ids) + + def test_with_zone_zero(self): + keyfn = utils.affinity_key_function("r4z0=1") + ids = [n['id'] for n in sorted(self.nodes, key=keyfn)] + self.assertEqual([6, 0, 1, 2, 3, 4, 5, 7], ids) + + def test_multiple(self): + keyfn = utils.affinity_key_function("r1=100, r4=200, r3z1=1") + ids = [n['id'] for n in sorted(self.nodes, key=keyfn)] + self.assertEqual([4, 0, 1, 6, 7, 2, 3, 5], ids) + + def test_more_specific_after_less_specific(self): + keyfn = utils.affinity_key_function("r2=100, r2z2=50") + ids = [n['id'] for n in sorted(self.nodes, key=keyfn)] + self.assertEqual([3, 2, 0, 1, 4, 5, 6, 7], ids) + class TestGreenthreadSafeIterator(unittest.TestCase): def increment(self, iterable): plus_ones = [] diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index af1b50745a..0bbcd8d8f0 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -661,25 +661,34 @@ class TestProxyServer(unittest.TestCase): exp_timings = {} self.assertEquals(baseapp.node_timings, exp_timings) - proxy_server.time = lambda: times.pop(0) - try: - times = [time.time()] - exp_timings = {'127.0.0.1': (0.1, - times[0] + baseapp.timing_expiry)} + times = [time.time()] + exp_timings = {'127.0.0.1': (0.1, times[0] + baseapp.timing_expiry)} + with mock.patch('swift.proxy.server.time', lambda: times.pop(0)): baseapp.set_node_timing({'ip': '127.0.0.1'}, 0.1) - self.assertEquals(baseapp.node_timings, exp_timings) - finally: - proxy_server.time = time.time + self.assertEquals(baseapp.node_timings, exp_timings) - proxy_server.shuffle = lambda l: l - try: - nodes = [{'ip': '127.0.0.1'}, {'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}] + nodes = [{'ip': '127.0.0.1'}, {'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}] + with mock.patch('swift.proxy.server.shuffle', lambda l: l): res = baseapp.sort_nodes(nodes) - exp_sorting = [{'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}, - {'ip': '127.0.0.1'}] - self.assertEquals(res, exp_sorting) - finally: - proxy_server.shuffle = random.shuffle + exp_sorting = [{'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}, + {'ip': '127.0.0.1'}] + self.assertEquals(res, exp_sorting) + + def test_node_affinity(self): + baseapp = proxy_server.Application({'sorting_method': 'affinity', + 'read_affinity': 'r1=1'}, + FakeMemcache(), + container_ring=FakeRing(), + object_ring=FakeRing(), + account_ring=FakeRing()) + + nodes = [{'region': 2, 'zone': 1, 'ip': '127.0.0.1'}, + {'region': 1, 'zone': 2, 'ip': '127.0.0.2'}] + with mock.patch('swift.proxy.server.shuffle', lambda x:x): + app_sorted = baseapp.sort_nodes(nodes) + exp_sorted = [{'region': 1, 'zone': 2, 'ip': '127.0.0.2'}, + {'region': 2, 'zone': 1, 'ip': '127.0.0.1'}] + self.assertEquals(exp_sorted, app_sorted) class TestObjectController(unittest.TestCase):