diff --git a/swift/account/reaper.py b/swift/account/reaper.py index 41b4485d1b..7b62df03fc 100644 --- a/swift/account/reaper.py +++ b/swift/account/reaper.py @@ -31,6 +31,7 @@ from swift.common.constraints import check_drive from swift.common.direct_client import direct_delete_container, \ direct_delete_object, direct_get_container from swift.common.exceptions import ClientException +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.ring import Ring from swift.common.ring.utils import is_local_device from swift.common.utils import get_logger, whataremyips, config_true_value, \ @@ -370,7 +371,8 @@ class AccountReaper(Daemon): node, part, account, container, marker=marker, conn_timeout=self.conn_timeout, - response_timeout=self.node_timeout) + response_timeout=self.node_timeout, + headers={USE_REPLICATION_NETWORK_HEADER: 'true'}) self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 self.logger.increment('return_codes.2') @@ -418,7 +420,8 @@ class AccountReaper(Daemon): 'X-Account-Partition': str(account_partition), 'X-Account-Device': anode['device'], 'X-Account-Override-Deleted': 'yes', - 'X-Timestamp': timestamp.internal}) + 'X-Timestamp': timestamp.internal, + USE_REPLICATION_NETWORK_HEADER: 'true'}) successes += 1 self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 @@ -494,7 +497,8 @@ class AccountReaper(Daemon): 'X-Container-Partition': str(container_partition), 'X-Container-Device': cnode['device'], 'X-Backend-Storage-Policy-Index': policy_index, - 'X-Timestamp': timestamp.internal}) + 'X-Timestamp': timestamp.internal, + USE_REPLICATION_NETWORK_HEADER: 'true'}) successes += 1 self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 diff --git a/swift/common/direct_client.py b/swift/common/direct_client.py index 86a644d8b0..a68159f207 100644 --- a/swift/common/direct_client.py +++ b/swift/common/direct_client.py @@ -29,6 +29,8 @@ from six.moves.http_client import HTTPException from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ClientException +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER, \ + get_ip_port from swift.common.swob import normalize_etag from swift.common.utils import Timestamp, FileLikeIter, quote from swift.common.http import HTTP_NO_CONTENT, HTTP_INSUFFICIENT_STORAGE, \ @@ -100,9 +102,10 @@ def _make_req(node, part, method, path, headers, stype, if content_length is None: headers['Transfer-Encoding'] = 'chunked' + ip, port = get_ip_port(node, headers) headers.setdefault('X-Backend-Allow-Reserved-Names', 'true') with Timeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], part, + conn = http_connect(ip, port, node['device'], part, method, path, headers=headers) if contents is not None: @@ -145,6 +148,9 @@ def _get_direct_account_container(path, stype, node, part, Do not use directly use the get_direct_account or get_direct_container instead. """ + if headers is None: + headers = {} + params = ['format=json'] if marker: params.append('marker=%s' % quote(marker)) @@ -159,8 +165,10 @@ def _get_direct_account_container(path, stype, node, part, if reverse: params.append('reverse=%s' % quote(reverse)) qs = '&'.join(params) + + ip, port = get_ip_port(node, headers) with Timeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], part, + conn = http_connect(ip, port, node['device'], part, 'GET', path, query_string=qs, headers=gen_headers(hdrs_in=headers)) with Timeout(response_timeout): @@ -200,7 +208,8 @@ def gen_headers(hdrs_in=None, add_ts=True): def direct_get_account(node, part, account, marker=None, limit=None, prefix=None, delimiter=None, conn_timeout=5, - response_timeout=15, end_marker=None, reverse=None): + response_timeout=15, end_marker=None, reverse=None, + headers=None): """ Get listings directly from the account server. @@ -220,6 +229,7 @@ def direct_get_account(node, part, account, marker=None, limit=None, """ path = _make_path(account) return _get_direct_account_container(path, "Account", node, part, + headers=headers, marker=marker, limit=limit, prefix=prefix, delimiter=delimiter, @@ -240,7 +250,7 @@ def direct_delete_account(node, part, account, conn_timeout=5, def direct_head_container(node, part, account, container, conn_timeout=5, - response_timeout=15): + response_timeout=15, headers=None): """ Request container information directly from the container server. @@ -253,8 +263,11 @@ def direct_head_container(node, part, account, container, conn_timeout=5, :returns: a dict containing the response's headers in a HeaderKeyDict :raises ClientException: HTTP HEAD request failed """ + if headers is None: + headers = {} + path = _make_path(account, container) - resp = _make_req(node, part, 'HEAD', path, gen_headers(), + resp = _make_req(node, part, 'HEAD', path, gen_headers(headers), 'Container', conn_timeout, response_timeout) resp_headers = HeaderKeyDict() @@ -431,9 +444,10 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5, if headers is None: headers = {} + ip, port = get_ip_port(node, headers) path = _make_path(account, container, obj) with Timeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], part, + conn = http_connect(ip, port, node['device'], part, 'GET', path, headers=gen_headers(headers)) with Timeout(response_timeout): resp = conn.getresponse() @@ -551,6 +565,9 @@ def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5, """ Get suffix hashes directly from the object server. + Note that unlike other ``direct_client`` functions, this one defaults + to using the replication network to make requests. + :param node: node dictionary from the ring :param part: partition the container is on :param conn_timeout: timeout in seconds for establishing the connection @@ -562,9 +579,11 @@ def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5, if headers is None: headers = {} + headers.setdefault(USE_REPLICATION_NETWORK_HEADER, 'true') + ip, port = get_ip_port(node, headers) path = '/%s' % '-'.join(suffixes) with Timeout(conn_timeout): - conn = http_connect(node['replication_ip'], node['replication_port'], + conn = http_connect(ip, port, node['device'], part, 'REPLICATE', path, headers=gen_headers(headers)) with Timeout(response_timeout): diff --git a/swift/common/internal_client.py b/swift/common/internal_client.py index c165909e71..58b06d1cb5 100644 --- a/swift/common/internal_client.py +++ b/swift/common/internal_client.py @@ -29,6 +29,7 @@ from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX from swift.common.exceptions import ClientException from swift.common.http import (HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES, is_client_error, is_server_error) +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.swob import Request, bytes_to_wsgi from swift.common.utils import quote, closing_if_possible from swift.common.wsgi import loadapp, pipeline_property @@ -147,13 +148,14 @@ class InternalClient(object): """ def __init__(self, conf_path, user_agent, request_tries, - allow_modify_pipeline=False): + allow_modify_pipeline=False, use_replication_network=False): if request_tries < 1: raise ValueError('request_tries must be positive') self.app = loadapp(conf_path, allow_modify_pipeline=allow_modify_pipeline) self.user_agent = user_agent self.request_tries = request_tries + self.use_replication_network = use_replication_network get_object_ring = pipeline_property('get_object_ring') container_ring = pipeline_property('container_ring') @@ -186,6 +188,9 @@ class InternalClient(object): headers = dict(headers) headers['user-agent'] = self.user_agent headers.setdefault('x-backend-allow-reserved-names', 'true') + if self.use_replication_network: + headers.setdefault(USE_REPLICATION_NETWORK_HEADER, 'true') + for attempt in range(self.request_tries): resp = exc_type = exc_value = exc_traceback = None req = Request.blank( diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index c120ac2778..c660da74b2 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -40,14 +40,15 @@ from swift.common.utils import split_path, validate_device_partition, \ close_if_possible, maybe_multipart_byteranges_to_document_iters, \ multipart_byteranges_to_document_iters, parse_content_type, \ parse_content_range, csv_append, list_from_csv, Spliterator, quote, \ - RESERVED + RESERVED, config_true_value from swift.common.wsgi import make_subrequest -from swift.container.reconciler import MISPLACED_OBJECTS_ACCOUNT OBJECT_TRANSIENT_SYSMETA_PREFIX = 'x-object-transient-sysmeta-' OBJECT_SYSMETA_CONTAINER_UPDATE_OVERRIDE_PREFIX = \ 'x-object-sysmeta-container-update-override-' +USE_REPLICATION_NETWORK_HEADER = 'x-backend-use-replication-network' +MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects' if six.PY2: @@ -849,3 +850,15 @@ def update_ignore_range_header(req, name): raise ValueError('Header name must not contain commas') hdr = 'X-Backend-Ignore-Range-If-Metadata-Present' req.headers[hdr] = csv_append(req.headers.get(hdr), name) + + +def get_ip_port(node, headers): + use_replication_network = False + for h, v in headers.items(): + if h.lower() == USE_REPLICATION_NETWORK_HEADER: + use_replication_network = config_true_value(v) + break + if use_replication_network: + return node['replication_ip'], node['replication_port'] + else: + return node['ip'], node['port'] diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py index 7a8b5fd1e9..9bf225eb64 100644 --- a/swift/container/reconciler.py +++ b/swift/container/reconciler.py @@ -27,11 +27,12 @@ from swift.common.direct_client import ( direct_head_container, direct_delete_container_object, direct_put_container_object, ClientException) from swift.common.internal_client import InternalClient, UnexpectedResponse +from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \ + USE_REPLICATION_NETWORK_HEADER from swift.common.utils import get_logger, split_path, majority_size, \ FileLikeIter, Timestamp, last_modified_date_to_timestamp, \ LRUCache, decode_timestamps -MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects' MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour CONTAINER_POLICY_TTL = 30 @@ -224,6 +225,7 @@ def add_to_reconciler_queue(container_ring, account, container, obj, 'X-Etag': obj_timestamp, 'X-Timestamp': x_timestamp, 'X-Content-Type': q_op_type, + USE_REPLICATION_NETWORK_HEADER: 'true', } def _check_success(*args, **kwargs): @@ -307,7 +309,8 @@ def direct_get_container_policy_index(container_ring, account_name, """ def _eat_client_exception(*args): try: - return direct_head_container(*args) + return direct_head_container(*args, headers={ + USE_REPLICATION_NETWORK_HEADER: 'true'}) except ClientException as err: if err.http_status == 404: return err.http_headers @@ -333,6 +336,10 @@ def direct_delete_container_entry(container_ring, account_name, container_name, object listing. Does not talk to object servers; use this only when a container entry does not actually have a corresponding object. """ + if headers is None: + headers = {} + headers[USE_REPLICATION_NETWORK_HEADER] = 'true' + pool = GreenPool() part, nodes = container_ring.get_nodes(account_name, container_name) for node in nodes: @@ -360,9 +367,11 @@ class ContainerReconciler(Daemon): '/etc/swift/container-reconciler.conf' self.logger = get_logger(conf, log_route='container-reconciler') request_tries = int(conf.get('request_tries') or 3) - self.swift = InternalClient(conf_path, - 'Swift Container Reconciler', - request_tries) + self.swift = InternalClient( + conf_path, + 'Swift Container Reconciler', + request_tries, + use_replication_network=True) self.stats = defaultdict(int) self.last_stat_time = time.time() diff --git a/swift/container/sharder.py b/swift/container/sharder.py index dd33043ae4..2c70a1b27f 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -29,6 +29,7 @@ from swift.common.constraints import check_drive, AUTO_CREATE_ACCOUNT_PREFIX from swift.common.direct_client import (direct_put_container, DirectClientException) from swift.common.exceptions import DeviceUnavailable +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.ring.utils import is_local_device from swift.common.swob import str_to_wsgi from swift.common.utils import get_logger, config_true_value, \ @@ -409,7 +410,8 @@ class ContainerSharder(ContainerReplicator): internal_client_conf_path, 'Swift Container Sharder', request_tries, - allow_modify_pipeline=False) + allow_modify_pipeline=False, + use_replication_network=True) except (OSError, IOError) as err: if err.errno != errno.ENOENT and \ not str(err).endswith(' not found'): @@ -623,6 +625,7 @@ class ContainerSharder(ContainerReplicator): part, nodes = self.ring.get_nodes(account, container) headers = headers or {} headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD, + USE_REPLICATION_NETWORK_HEADER: 'True', 'User-Agent': 'container-sharder %s' % os.getpid(), 'X-Timestamp': Timestamp.now().normal, 'Content-Length': len(body), diff --git a/swift/container/sync.py b/swift/container/sync.py index 529f85a448..34b300fb03 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -241,7 +241,8 @@ class ContainerSync(Daemon): internal_client_conf = internal_client_conf_path try: self.swift = InternalClient( - internal_client_conf, 'Swift Container Sync', request_tries) + internal_client_conf, 'Swift Container Sync', request_tries, + use_replication_network=True) except (OSError, IOError) as err: if err.errno != errno.ENOENT and \ not str(err).endswith(' not found'): diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 1aef5fc2ff..b041f99c0a 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -134,7 +134,8 @@ class ObjectExpirer(Daemon): request_tries = int(self.conf.get('request_tries') or 3) self.swift = swift or InternalClient( - self.ic_conf_path, 'Swift Object Expirer', request_tries) + self.ic_conf_path, 'Swift Object Expirer', request_tries, + use_replication_network=True) self.processes = int(self.conf.get('processes', 0)) self.process = int(self.conf.get('process', 0)) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 95ff8e7653..2912da3ea6 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -63,7 +63,7 @@ from swift.common.swob import Request, Response, Range, \ from swift.common.request_helpers import strip_sys_meta_prefix, \ strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \ http_response_to_document_iters, is_object_transient_sysmeta, \ - strip_object_transient_sysmeta_prefix + strip_object_transient_sysmeta_prefix, get_ip_port from swift.common.storage_policy import POLICIES @@ -1264,11 +1264,13 @@ class ResumingGetter(object): # a request may be specialised with specific backend headers if self.header_provider: req_headers.update(self.header_provider()) + + ip, port = get_ip_port(node, req_headers) start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect( - node['ip'], node['port'], node['device'], + ip, port, node['device'], self.partition, self.req_method, self.path, headers=req_headers, query_string=self.req_query_string) @@ -1766,11 +1768,12 @@ class Controller(object): headers['Content-Length'] = str(len(body)) for node in nodes: try: + ip, port = get_ip_port(node, headers) start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], part, method, path, - headers=headers, query_string=query) + conn = http_connect( + ip, port, node['device'], part, method, path, + headers=headers, query_string=query) conn.node = node self.app.set_node_timing(node, time.time() - start_node_timing) if body: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 058c445f10..ceab334463 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -73,7 +73,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError, \ normalize_etag from swift.common.request_helpers import update_etag_is_at_header, \ - resolve_etag_is_at_header, validate_internal_obj + resolve_etag_is_at_header, validate_internal_obj, get_ip_port def check_content_type(req): @@ -1683,9 +1683,10 @@ class Putter(object): @classmethod def _make_connection(cls, node, part, path, headers, conn_timeout, node_timeout): + ip, port = get_ip_port(node, headers) start_time = time.time() with ConnectionTimeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], + conn = http_connect(ip, port, node['device'], part, 'PUT', path, headers) connect_duration = time.time() - start_time diff --git a/test/unit/__init__.py b/test/unit/__init__.py index e019e3058a..adb6663903 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -215,12 +215,13 @@ class PatchPolicies(object): class FakeRing(Ring): def __init__(self, replicas=3, max_more_nodes=0, part_power=0, - base_port=1000): + base_port=1000, separate_replication=False): self.serialized_path = '/foo/bar/object.ring.gz' self._base_port = base_port self.max_more_nodes = max_more_nodes self._part_shift = 32 - part_power self._init_device_char() + self.separate_replication = separate_replication # 9 total nodes (6 more past the initial 3) is the cap, no matter if # this is set higher, or R^2 for R replicas self.set_replicas(replicas) @@ -256,11 +257,16 @@ class FakeRing(Ring): for x in range(self.replicas): ip = '10.0.0.%s' % x port = self._base_port + x + if self.separate_replication: + repl_ip = '10.0.1.%s' % x + repl_port = port + 100 + else: + repl_ip, repl_port = ip, port dev = { 'ip': ip, - 'replication_ip': ip, + 'replication_ip': repl_ip, 'port': port, - 'replication_port': port, + 'replication_port': repl_port, 'device': self.device_char, 'zone': x % 3, 'region': x % 2, @@ -278,10 +284,17 @@ class FakeRing(Ring): def get_more_nodes(self, part): index_counter = itertools.count() for x in range(self.replicas, (self.replicas + self.max_more_nodes)): - yield {'ip': '10.0.0.%s' % x, - 'replication_ip': '10.0.0.%s' % x, - 'port': self._base_port + x, - 'replication_port': self._base_port + x, + ip = '10.0.0.%s' % x + port = self._base_port + x + if self.separate_replication: + repl_ip = '10.0.1.%s' % x + repl_port = port + 100 + else: + repl_ip, repl_port = ip, port + yield {'ip': ip, + 'replication_ip': repl_ip, + 'port': port, + 'replication_port': repl_port, 'device': 'sda', 'zone': x % 3, 'region': x % 2, @@ -1265,12 +1278,11 @@ def fake_ec_node_response(node_frags, policy): call_count = {} # maps node index to get_response call count for node def _build_node_map(req, policy): - node_key = lambda n: (n['ip'], n['port']) part = utils.split_path(req['path'], 5, 5, True)[1] all_nodes.extend(policy.object_ring.get_part_nodes(part)) all_nodes.extend(policy.object_ring.get_more_nodes(part)) for i, node in enumerate(all_nodes): - node_map[node_key(node)] = i + node_map[(node['ip'], node['port'])] = i call_count[i] = 0 # normalize node_frags to a list of fragments for each node even diff --git a/test/unit/account/test_reaper.py b/test/unit/account/test_reaper.py index d8b76eb9b2..60d0d28b2f 100644 --- a/test/unit/account/test_reaper.py +++ b/test/unit/account/test_reaper.py @@ -322,7 +322,8 @@ class TestReaper(unittest.TestCase): 'X-Container-Partition': 'partition', 'X-Container-Device': device, 'X-Backend-Storage-Policy-Index': policy.idx, - 'X-Timestamp': '1429117638.86767' + 'X-Timestamp': '1429117638.86767', + 'x-backend-use-replication-network': 'true', } ring = r.get_object_ring(policy.idx) expected = call(dict(ring.devs[i], index=i), 0, @@ -442,7 +443,8 @@ class TestReaper(unittest.TestCase): 'X-Account-Partition': 'partition', 'X-Account-Device': device, 'X-Account-Override-Deleted': 'yes', - 'X-Timestamp': '1429117639.67676' + 'X-Timestamp': '1429117639.67676', + 'x-backend-use-replication-network': 'true', } ring = r.get_object_ring(policy.idx) expected = call(dict(ring.devs[i], index=i), 0, 'a', 'c', diff --git a/test/unit/common/test_direct_client.py b/test/unit/common/test_direct_client.py index f15aab2883..faf2887cf6 100644 --- a/test/unit/common/test_direct_client.py +++ b/test/unit/common/test_direct_client.py @@ -316,6 +316,31 @@ class TestDirectClient(unittest.TestCase): self.assertIn('X-Timestamp', headers) self.assertIn('User-Agent', headers) + def test_direct_delete_account_replication_net(self): + part = '0' + account = 'a' + + mock_path = 'swift.common.bufferedhttp.http_connect_raw' + with mock.patch(mock_path) as fake_connect: + fake_connect.return_value.getresponse.return_value.status = 200 + direct_client.direct_delete_account( + self.node, part, account, + headers={'X-Backend-Use-Replication-Network': 't'}) + args, kwargs = fake_connect.call_args + ip = args[0] + self.assertEqual(self.node['replication_ip'], ip) + self.assertNotEqual(self.node['ip'], ip) + port = args[1] + self.assertEqual(self.node['replication_port'], port) + self.assertNotEqual(self.node['port'], port) + method = args[2] + self.assertEqual('DELETE', method) + path = args[3] + self.assertEqual('/sda/0/a', path) + headers = args[4] + self.assertIn('X-Timestamp', headers) + self.assertIn('User-Agent', headers) + def test_direct_delete_account_failure(self): part = '0' account = 'a' @@ -346,6 +371,24 @@ class TestDirectClient(unittest.TestCase): self.user_agent) self.assertEqual(headers, resp) + def test_direct_head_container_replication_net(self): + headers = HeaderKeyDict(key='value') + + with mocked_http_conn(200, headers) as conn: + resp = direct_client.direct_head_container( + self.node, self.part, self.account, self.container, + headers={'X-Backend-Use-Replication-Network': 'on'}) + self.assertEqual(conn.host, self.node['replication_ip']) + self.assertEqual(conn.port, self.node['replication_port']) + self.assertNotEqual(conn.host, self.node['ip']) + self.assertNotEqual(conn.port, self.node['port']) + self.assertEqual(conn.method, 'HEAD') + self.assertEqual(conn.path, self.container_path) + + self.assertEqual(conn.req_headers['user-agent'], + self.user_agent) + self.assertEqual(headers, resp) + def test_direct_head_container_error(self): headers = HeaderKeyDict(key='value') @@ -441,6 +484,18 @@ class TestDirectClient(unittest.TestCase): self.assertEqual(conn.method, 'DELETE') self.assertEqual(conn.path, self.container_path) + def test_direct_delete_container_replication_net(self): + with mocked_http_conn(200) as conn: + direct_client.direct_delete_container( + self.node, self.part, self.account, self.container, + headers={'X-Backend-Use-Replication-Network': '1'}) + self.assertEqual(conn.host, self.node['replication_ip']) + self.assertEqual(conn.port, self.node['replication_port']) + self.assertNotEqual(conn.host, self.node['ip']) + self.assertNotEqual(conn.port, self.node['port']) + self.assertEqual(conn.method, 'DELETE') + self.assertEqual(conn.path, self.container_path) + def test_direct_delete_container_with_timestamp(self): # ensure timestamp is different from any that might be auto-generated timestamp = Timestamp(time.time() - 100) diff --git a/test/unit/common/test_internal_client.py b/test/unit/common/test_internal_client.py index da055d7653..a57eae04a7 100644 --- a/test/unit/common/test_internal_client.py +++ b/test/unit/common/test_internal_client.py @@ -25,7 +25,7 @@ from textwrap import dedent import six from six.moves import range, zip_longest from six.moves.urllib.parse import quote, parse_qsl -from swift.common import exceptions, internal_client, swob +from swift.common import exceptions, internal_client, request_helpers, swob from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import StoragePolicy from swift.common.middleware.proxy_logging import ProxyLoggingMiddleware @@ -303,7 +303,7 @@ class TestInternalClient(unittest.TestCase): with mock.patch.object(internal_client, 'loadapp', app.load), \ self.assertRaises(ValueError): # First try with a bad arg - client = internal_client.InternalClient( + internal_client.InternalClient( conf_path, user_agent, request_tries=0) self.assertEqual(0, app.load_called) @@ -315,6 +315,18 @@ class TestInternalClient(unittest.TestCase): self.assertEqual(app, client.app) self.assertEqual(user_agent, client.user_agent) self.assertEqual(request_tries, client.request_tries) + self.assertFalse(client.use_replication_network) + + with mock.patch.object(internal_client, 'loadapp', app.load): + client = internal_client.InternalClient( + conf_path, user_agent, request_tries, + use_replication_network=True) + + self.assertEqual(2, app.load_called) + self.assertEqual(app, client.app) + self.assertEqual(user_agent, client.user_agent) + self.assertEqual(request_tries, client.request_tries) + self.assertTrue(client.use_replication_network) def test_make_request_sets_user_agent(self): class InternalClient(internal_client.InternalClient): @@ -323,8 +335,11 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 1 + self.use_replication_network = False def fake_app(self, env, start_response): + self.test.assertNotIn( + 'HTTP_X_BACKEND_USE_REPLICATION_NETWORK', env) self.test.assertEqual(self.user_agent, env['HTTP_USER_AGENT']) start_response('200 Ok', [('Content-Length', '0')]) return [] @@ -332,6 +347,47 @@ class TestInternalClient(unittest.TestCase): client = InternalClient(self) client.make_request('GET', '/', {}, (200,)) + def test_make_request_defaults_replication_network_header(self): + class InternalClient(internal_client.InternalClient): + def __init__(self, test): + self.test = test + self.app = self.fake_app + self.user_agent = 'some_agent' + self.request_tries = 1 + self.use_replication_network = False + self.expected_header_value = None + + def fake_app(self, env, start_response): + if self.expected_header_value is None: + self.test.assertNotIn( + 'HTTP_X_BACKEND_USE_REPLICATION_NETWORK', env) + else: + hdr_val = env['HTTP_X_BACKEND_USE_REPLICATION_NETWORK'] + self.test.assertEqual(self.expected_header_value, hdr_val) + start_response('200 Ok', [('Content-Length', '0')]) + return [] + + client = InternalClient(self) + client.make_request('GET', '/', {}, (200,)) + # Caller can still override + client.expected_header_value = 'false' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'false'}, (200,)) + client.expected_header_value = 'true' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'true'}, (200,)) + + # Switch default behavior + client.use_replication_network = True + + client.make_request('GET', '/', {}, (200,)) + client.expected_header_value = 'false' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'false'}, (200,)) + client.expected_header_value = 'on' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'on'}, (200,)) + def test_make_request_sets_query_string(self): captured_envs = [] @@ -341,6 +397,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 1 + self.use_replication_network = False def fake_app(self, env, start_response): captured_envs.append(env) @@ -362,6 +419,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 4 + self.use_replication_network = False self.tries = 0 self.sleep_called = 0 @@ -441,6 +499,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False self.env = None def fake_app(self, env, start_response): @@ -468,6 +527,7 @@ class TestInternalClient(unittest.TestCase): self.fake_app, {}, self.logger) self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): body = b'fake error response' @@ -499,6 +559,7 @@ class TestInternalClient(unittest.TestCase): self.user_agent = 'some_agent' self.resp_status = resp_status self.request_tries = 3 + self.use_replication_network = False self.closed_paths = [] self.fully_read_paths = [] @@ -557,6 +618,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('200 Ok', [('Content-Length', '0')]) @@ -607,6 +669,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False self.status = status self.call_count = 0 @@ -698,6 +761,7 @@ class TestInternalClient(unittest.TestCase): def __init__(self): self.user_agent = 'test' self.request_tries = 1 + self.use_replication_network = False self.app = self.fake_app def fake_app(self, environ, start_response): @@ -1217,6 +1281,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): self.req_env = env @@ -1261,6 +1326,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('200 Ok', [('Content-Length', '0')]) @@ -1280,6 +1346,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('200 Ok', [('Content-Length', '0')]) @@ -1300,6 +1367,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('404 Not Found', []) @@ -1330,6 +1398,7 @@ class TestInternalClient(unittest.TestCase): class InternalClient(internal_client.InternalClient): def __init__(self, test, path, headers, fobj): self.test = test + self.use_replication_network = False self.path = path self.headers = headers self.fobj = fobj diff --git a/test/unit/common/test_request_helpers.py b/test/unit/common/test_request_helpers.py index 70e75d53a0..14bdd04d2a 100644 --- a/test/unit/common/test_request_helpers.py +++ b/test/unit/common/test_request_helpers.py @@ -124,6 +124,19 @@ class TestRequestHelpers(unittest.TestCase): self.assertFalse('c' in to_req.headers) self.assertFalse('C' in to_req.headers) + def test_get_ip_port(self): + node = { + 'ip': '1.2.3.4', + 'port': 6000, + 'replication_ip': '5.6.7.8', + 'replication_port': 7000, + } + self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, {})) + self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, { + rh.USE_REPLICATION_NETWORK_HEADER: 'true'})) + self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, { + rh.USE_REPLICATION_NETWORK_HEADER: 'false'})) + @patch_policies(with_ec_default=True) def test_get_name_and_placement_object_req(self): path = '/device/part/account/container/object' diff --git a/test/unit/container/test_reconciler.py b/test/unit/container/test_reconciler.py index fc227b4b87..5fc26edeff 100644 --- a/test/unit/container/test_reconciler.py +++ b/test/unit/container/test_reconciler.py @@ -95,6 +95,7 @@ class FakeInternalClient(reconciler.InternalClient): self.app = FakeStoragePolicySwift() self.user_agent = 'fake-internal-client' self.request_tries = 1 + self.use_replication_network = True self.parse(listings) def parse(self, listings): diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index a54ddb652b..36e0795234 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -171,7 +171,8 @@ class TestSharder(BaseTestSharder): 'container-sharder', sharder.logger.logger.name) mock_ic.assert_called_once_with( '/etc/swift/internal-client.conf', 'Swift Container Sharder', 3, - allow_modify_pipeline=False) + allow_modify_pipeline=False, + use_replication_network=True) conf = { 'mount_check': False, 'bind_ip': '10.11.12.13', 'bind_port': 62010, @@ -221,7 +222,8 @@ class TestSharder(BaseTestSharder): sharder, mock_ic = do_test(conf, expected) mock_ic.assert_called_once_with( '/etc/swift/my-sharder-ic.conf', 'Swift Container Sharder', 2, - allow_modify_pipeline=False) + allow_modify_pipeline=False, + use_replication_network=True) self.assertEqual(self.logger.get_lines_for_level('warning'), [ 'Option auto_create_account_prefix is deprecated. ' 'Configure auto_create_account_prefix under the ' @@ -731,11 +733,12 @@ class TestSharder(BaseTestSharder): self.logger.clear() conf = conf or {} conf['devices'] = self.tempdir + fake_ring = FakeRing(replicas=replicas, separate_replication=True) with mock.patch( 'swift.container.sharder.internal_client.InternalClient'): with mock.patch( 'swift.common.db_replicator.ring.Ring', - lambda *args, **kwargs: FakeRing(replicas=replicas)): + return_value=fake_ring): sharder = ContainerSharder(conf, logger=self.logger) sharder._local_device_ids = {0, 1, 2} sharder._replicate_object = mock.MagicMock( @@ -4185,20 +4188,31 @@ class TestSharder(BaseTestSharder): def check_shard_ranges_sent(self, broker, expected_sent): bodies = [] + servers = [] def capture_send(conn, data): bodies.append(data) + def capture_connect(host, port, *a, **kw): + servers.append((host, port)) + self.assertFalse(broker.get_own_shard_range().reported) # sanity with self._mock_sharder() as sharder: with mocked_http_conn(204, 204, 204, - give_send=capture_send) as mock_conn: + give_send=capture_send, + give_connect=capture_connect) as mock_conn: sharder._update_root_container(broker) for req in mock_conn.requests: self.assertEqual('PUT', req['method']) self.assertEqual([expected_sent] * 3, [json.loads(b) for b in bodies]) + self.assertEqual(servers, [ + # NB: replication interfaces + ('10.0.1.0', 1100), + ('10.0.1.1', 1101), + ('10.0.1.2', 1102), + ]) self.assertTrue(broker.get_own_shard_range().reported) def test_update_root_container_own_range(self): diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 8ccdfbdffc..e9cebfe930 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -318,7 +318,7 @@ class TestController(unittest.TestCase): self.controller.account_info(self.account, self.request) set_http_connect(201, raise_timeout_exc=True) self.controller._make_request( - nodes, partition, 'POST', '/', '', '', None, + nodes, partition, 'POST', '/', {}, '', None, self.controller.app.logger.thread_locals) # tests if 200 is cached and used