From 8f60e0a2607514f05fb873e4a313ab4a93df7601 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Fri, 28 Feb 2020 09:48:37 -0600 Subject: [PATCH] Extend concurrent_gets to EC GET requests After the initial requests are started, if the proxy still does not have enough backend responses to return a client response additional requests will be spawned to remaining primaries at the frequency configured by the concurrency_timeout. A new tunable concurrent_ec_extra_requests allows operators to control how many requests to backend fragments are started immediately with a client request to an object stored in an EC storage policy. By default the minimum ndata backend requests are started immediately, but operators may increase concurrent_ec_extra_requests up to nparity which is similar in effect to a concurrency_timeout of 0. Change-Id: Ia0a9398107a400815be2e0097b1b8e76336a0253 --- etc/proxy-server.conf-sample | 21 +- swift/proxy/controllers/base.py | 17 +- swift/proxy/controllers/obj.py | 534 +++++++++++------------ swift/proxy/server.py | 2 + test/unit/proxy/controllers/test_base.py | 108 ++++- test/unit/proxy/controllers/test_obj.py | 500 +++++++++++++++++++-- test/unit/proxy/test_server.py | 29 +- 7 files changed, 875 insertions(+), 336 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 0387fde203..487e443b22 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -200,12 +200,14 @@ use = egg:swift#proxy # the number of seconds configured by timing_expiry. # timing_expiry = 300 # -# By default on a GET/HEAD swift will connect to a storage node one at a time -# in a single thread. There is smarts in the order they are hit however. If you -# turn on concurrent_gets below, then replica count threads will be used. -# With addition of the concurrency_timeout option this will allow swift to send -# out GET/HEAD requests to the storage nodes concurrently and answer with the -# first to respond. With an EC policy the parameter only affects HEAD requests. +# By default on a GET/HEAD swift will connect to a minimum number storage nodes +# in a minimum number of threads - for replicated data just a single request to +# a single node one at a time. When enabled concurrent_gets allows the proxy, +# to use up to replica count threads when waiting on a response. In +# conjunction with the concurrency_timeout option this will allow swift to send +# out GET/HEAD requests to the storage nodes concurrently and answer as soon as +# the minimum number of backend responses are availabe - in replicated contexts +# this will be the first backend replica to respond. # concurrent_gets = off # # This parameter controls how long to wait before firing off the next @@ -215,6 +217,13 @@ use = egg:swift#proxy # conn_timeout parameter. # concurrency_timeout = 0.5 # +# By default on a EC GET request swift will connect to a minimum number of +# storage nodes in a minimum number of threads - for erasure coded data, ndata +# requests to primary nodes are started at the same time. When greater than +# zero this option provides additional robustness and may reduce first byte +# latency by starting additional requests - up to as many as nparity. +# concurrent_ec_extra_requests = 0 +# # Set to the number of nodes to contact for a normal request. You can use # '* replicas' at the end to have it use the number given times the number of # replicas for the ring being used for the request. diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 43ba6d695f..60e5ff7bac 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -1488,18 +1488,22 @@ class NodeIter(object): if node_iter is None: node_iter = itertools.chain( part_nodes, ring.get_more_nodes(partition)) - num_primary_nodes = len(part_nodes) - self.nodes_left = self.app.request_node_count(num_primary_nodes) - self.expected_handoffs = self.nodes_left - num_primary_nodes + self.num_primary_nodes = len(part_nodes) + self.nodes_left = self.app.request_node_count(self.num_primary_nodes) + self.expected_handoffs = self.nodes_left - self.num_primary_nodes # Use of list() here forcibly yanks the first N nodes (the primary # nodes) from node_iter, so the rest of its values are handoffs. self.primary_nodes = self.app.sort_nodes( - list(itertools.islice(node_iter, num_primary_nodes)), + list(itertools.islice(node_iter, self.num_primary_nodes)), policy=policy) self.handoff_iter = node_iter self._node_provider = None + @property + def primaries_left(self): + return len(self.primary_nodes) + def __iter__(self): self._node_iter = self._node_gen() return self @@ -1523,7 +1527,7 @@ class NodeIter(object): self.app.logger.increment('handoff_count') self.app.logger.warning( 'Handoff requested (%d)' % handoffs) - if (extra_handoffs == len(self.primary_nodes)): + if (extra_handoffs == self.num_primary_nodes): # all the primaries were skipped, and handoffs didn't help self.app.logger.increment('handoff_all_count') @@ -1539,7 +1543,8 @@ class NodeIter(object): self._node_provider = callback def _node_gen(self): - for node in self.primary_nodes: + while self.primary_nodes: + node = self.primary_nodes.pop(0) if not self.app.error_limited(node): yield node if not self.app.error_limited(node): diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 9f3d83b86f..6fa359d679 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -40,7 +40,7 @@ import sys from greenlet import GreenletExit from eventlet import GreenPile, sleep -from eventlet.queue import Queue +from eventlet.queue import Queue, Empty from eventlet.timeout import Timeout from swift import gettext_ as _ @@ -68,7 +68,7 @@ from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, ECDriverError, PolicyError) from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation, update_headers, bytes_to_skip, close_swift_conn, \ - ByteCountEnforcer, source_key + ByteCountEnforcer from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ @@ -1400,7 +1400,7 @@ class ECAppIter(object): # killed by contextpool pass except ChunkReadTimeout: - # unable to resume in GetOrHeadHandler + # unable to resume in ECFragGetter self.logger.exception(_("Timeout fetching fragments for %r"), quote(self.path)) except: # noqa @@ -1984,13 +1984,13 @@ class ECGetResponseBucket(object): A helper class to encapsulate the properties of buckets in which fragment getters and alternate nodes are collected. """ - def __init__(self, policy, timestamp_str): + def __init__(self, policy, timestamp): """ :param policy: an instance of ECStoragePolicy - :param timestamp_str: a string representation of a timestamp + :param timestamp: a Timestamp, or None for a bucket of error reponses """ self.policy = policy - self.timestamp_str = timestamp_str + self.timestamp = timestamp self.gets = collections.defaultdict(list) self.alt_nodes = collections.defaultdict(list) self._durable = False @@ -2004,10 +2004,20 @@ class ECGetResponseBucket(object): return self._durable def add_response(self, getter, parts_iter): + """ + Add another response to this bucket. Response buckets can be for + fragments with the same timestamp, or for errors with the same status. + """ + headers = getter.last_headers + timestamp_str = headers.get('X-Backend-Timestamp', + headers.get('X-Timestamp')) + if timestamp_str: + # 404s will keep the most recent timestamp + self.timestamp = max(Timestamp(timestamp_str), self.timestamp) if not self.gets: - self.status = getter.last_status # stash first set of backend headers, which will be used to # populate a client response + self.status = getter.last_status # TODO: each bucket is for a single *data* timestamp, but sources # in the same bucket may have different *metadata* timestamps if # some backends have more recent .meta files than others. Currently @@ -2017,18 +2027,17 @@ class ECGetResponseBucket(object): # recent metadata. We could alternatively choose to the *newest* # metadata headers for self.headers by selecting the source with # the latest X-Timestamp. - self.headers = getter.last_headers - elif (self.timestamp_str is not None and # ie, not bad_bucket - getter.last_headers.get('X-Object-Sysmeta-Ec-Etag') != - self.headers.get('X-Object-Sysmeta-Ec-Etag')): + self.headers = headers + elif headers.get('X-Object-Sysmeta-Ec-Etag') != \ + self.headers.get('X-Object-Sysmeta-Ec-Etag'): # Fragments at the same timestamp with different etags are never - # expected. If somehow it happens then ignore those fragments - # to avoid mixing fragments that will not reconstruct otherwise - # an exception from pyeclib is almost certain. This strategy leaves - # a possibility that a set of consistent frags will be gathered. + # expected and error buckets shouldn't have this header. If somehow + # this happens then ignore those responses to avoid mixing + # fragments that will not reconstruct otherwise an exception from + # pyeclib is almost certain. raise ValueError("ETag mismatch") - frag_index = getter.last_headers.get('X-Object-Sysmeta-Ec-Frag-Index') + frag_index = headers.get('X-Object-Sysmeta-Ec-Frag-Index') frag_index = int(frag_index) if frag_index is not None else None self.gets[frag_index].append((getter, parts_iter)) @@ -2056,8 +2065,19 @@ class ECGetResponseBucket(object): @property def shortfall(self): - result = self.policy.ec_ndata - len(self.get_responses()) - return max(result, 0) + """ + The number of additional responses needed to complete this bucket; + typically (ndata - resp_count). + + If the bucket has no durable responses, shortfall is extended out to + replica count to ensure the proxy makes additional primary requests. + """ + resp_count = len(self.get_responses()) + if self.durable or self.status == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE: + return max(self.policy.ec_ndata - resp_count, 0) + alt_count = min(self.policy.object_ring.replica_count - resp_count, + self.policy.ec_nparity) + return max([1, self.policy.ec_ndata - resp_count, alt_count]) @property def shortfall_with_alts(self): @@ -2070,7 +2090,7 @@ class ECGetResponseBucket(object): def __str__(self): # return a string summarising bucket state, useful for debugging. return '<%s, %s, %s, %s(%s), %s>' \ - % (self.timestamp_str, self.status, self._durable, + % (self.timestamp.internal, self.status, self._durable, self.shortfall, self.shortfall_with_alts, len(self.gets)) @@ -2092,15 +2112,24 @@ class ECGetResponseCollection(object): """ self.policy = policy self.buckets = {} + self.bad_buckets = {None: ECGetResponseBucket(self.policy, None)} self.node_iter_count = 0 - def _get_bucket(self, timestamp_str): + def _get_bucket(self, timestamp): """ - :param timestamp_str: a string representation of a timestamp + :param timestamp: a Timestamp :return: ECGetResponseBucket for given timestamp """ return self.buckets.setdefault( - timestamp_str, ECGetResponseBucket(self.policy, timestamp_str)) + timestamp, ECGetResponseBucket(self.policy, timestamp)) + + def _get_bad_bucket(self, status): + """ + :param status: a representation of status + :return: ECGetResponseBucket for given status + """ + return self.bad_buckets.setdefault( + status, ECGetResponseBucket(self.policy, None)) def add_response(self, get, parts_iter): """ @@ -2112,13 +2141,31 @@ class ECGetResponseCollection(object): :raises ValueError: if the response etag or status code values do not match any values previously received for the same timestamp """ + if is_success(get.last_status): + self.add_good_response(get, parts_iter) + else: + self.add_bad_resp(get, parts_iter) + + def add_bad_resp(self, get, parts_iter): + bad_bucket = self._get_bad_bucket(get.last_status) + bad_bucket.add_response(get, parts_iter) + + def add_good_response(self, get, parts_iter): headers = get.last_headers # Add the response to the appropriate bucket keyed by data file # timestamp. Fall back to using X-Backend-Timestamp as key for object # servers that have not been upgraded. t_data_file = headers.get('X-Backend-Data-Timestamp') t_obj = headers.get('X-Backend-Timestamp', headers.get('X-Timestamp')) - self._get_bucket(t_data_file or t_obj).add_response(get, parts_iter) + if t_data_file: + timestamp = Timestamp(t_data_file) + elif t_obj: + timestamp = Timestamp(t_obj) + else: + # Don't think this should ever come up in practice, + # but tests cover it + timestamp = None + self._get_bucket(timestamp).add_response(get, parts_iter) # The node may also have alternate fragments indexes (possibly at # different timestamps). For each list of alternate fragments indexes, @@ -2126,6 +2173,7 @@ class ECGetResponseCollection(object): # list to that bucket's alternate nodes. frag_sets = safe_json_loads(headers.get('X-Backend-Fragments')) or {} for t_frag, frag_set in frag_sets.items(): + t_frag = Timestamp(t_frag) self._get_bucket(t_frag).add_alternate_nodes(get.node, frag_set) # If the response includes a durable timestamp then mark that bucket as # durable. Note that this may be a different bucket than the one this @@ -2137,7 +2185,7 @@ class ECGetResponseCollection(object): # obj server not upgraded so assume this response's frag is durable t_durable = t_obj if t_durable: - self._get_bucket(t_durable).set_durable() + self._get_bucket(Timestamp(t_durable)).set_durable() def _sort_buckets(self): def key_fn(bucket): @@ -2150,35 +2198,65 @@ class ECGetResponseCollection(object): return (bucket.durable, bucket.shortfall <= 0, -1 * bucket.shortfall_with_alts, - bucket.timestamp_str) + bucket.timestamp) return sorted(self.buckets.values(), key=key_fn, reverse=True) @property def best_bucket(self): """ - Return the best bucket in the collection. + Return the "best" bucket in the collection. The "best" bucket is the newest timestamp with sufficient getters, or the closest to having sufficient getters, unless it is bettered by a bucket with potential alternate nodes. + If there are no good buckets we return the "least_bad" bucket. + :return: An instance of :class:`~ECGetResponseBucket` or None if there are no buckets in the collection. """ sorted_buckets = self._sort_buckets() - if sorted_buckets: - return sorted_buckets[0] - return None + for bucket in sorted_buckets: + # tombstones will set bad_bucket.timestamp + not_found_bucket = self.bad_buckets.get(404) + if not_found_bucket and not_found_bucket.timestamp and \ + bucket.timestamp < not_found_bucket.timestamp: + # "good bucket" is trumped by newer tombstone + continue + return bucket + return self.least_bad_bucket + + @property + def least_bad_bucket(self): + """ + Return the bad_bucket with the smallest shortfall + """ + # we want "enough" 416s to prevent "extra" requests - but we keep + # digging on 404s + short, status = min((bucket.shortfall, status) + for status, bucket in self.bad_buckets.items() + if status != 404) + return self.bad_buckets[status] + + @property + def shortfall(self): + best_bucket = self.best_bucket + shortfall = best_bucket.shortfall + return min(shortfall, self.least_bad_bucket.shortfall) + + @property + def durable(self): + return self.best_bucket.durable def _get_frag_prefs(self): # Construct the current frag_prefs list, with best_bucket prefs first. frag_prefs = [] for bucket in self._sort_buckets(): - if bucket.timestamp_str: + if bucket.timestamp: exclusions = [fi for fi in bucket.gets if fi is not None] - prefs = {'timestamp': bucket.timestamp_str, + prefs = {'timestamp': bucket.timestamp.internal, 'exclude': exclusions} frag_prefs.append(prefs) @@ -2237,44 +2315,34 @@ class ECGetResponseCollection(object): return nodes.pop(0).copy() +def is_good_source(status): + """ + Indicates whether or not the request made to the backend found + what it was looking for. + + :param status: the response from the backend + :returns: True if found, False if not + """ + if status == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE: + return True + return is_success(status) or is_redirection(status) + + class ECFragGetter(object): - def __init__(self, app, req, server_type, node_iter, partition, path, - backend_headers, concurrency=1, client_chunk_size=None, - newest=None, header_provider=None): + + def __init__(self, app, req, node_iter, partition, policy, path, + backend_headers, header_provider=None): self.app = app + self.req = req self.node_iter = node_iter - self.server_type = server_type self.partition = partition self.path = path self.backend_headers = backend_headers - self.client_chunk_size = client_chunk_size + self.header_provider = header_provider + self.req_query_string = req.query_string + self.client_chunk_size = policy.fragment_size self.skip_bytes = 0 self.bytes_used_from_backend = 0 - self.used_nodes = [] - self.used_source_etag = '' - self.concurrency = concurrency - self.node = None - self.header_provider = header_provider - self.latest_404_timestamp = Timestamp(0) - - # stuff from request - self.req_method = req.method - self.req_path = req.path - self.req_query_string = req.query_string - if newest is None: - self.newest = config_true_value(req.headers.get('x-newest', 'f')) - else: - self.newest = newest - - # populated when finding source - self.statuses = [] - self.reasons = [] - self.bodies = [] - self.source_headers = [] - self.sources = [] - - # populated from response headers - self.start_byte = self.end_byte = self.length = None def fast_forward(self, num_bytes): """ @@ -2382,20 +2450,11 @@ class ECFragGetter(object): e if e is not None else '') for s, e in new_ranges))) - def is_good_source(self, src): - """ - Indicates whether or not the request made to the backend found - what it was looking for. - - :param src: the response from the backend - :returns: True if found, False if not - """ - if self.server_type == 'Object' and src.status == 416: - return True - return is_success(src.status) or is_redirection(src.status) - def response_parts_iter(self, req): - source, node = self._get_source_and_node() + try: + source, node = next(self.source_and_node_iter) + except StopIteration: + return it = None if source: it = self._get_response_parts_iter(req, node, source) @@ -2408,9 +2467,7 @@ class ECFragGetter(object): try: client_chunk_size = self.client_chunk_size - node_timeout = self.app.node_timeout - if self.server_type == 'Object': - node_timeout = self.app.recoverable_node_timeout + node_timeout = self.app.recoverable_node_timeout # This is safe; it sets up a generator but does not call next() # on it, so no IO is performed. @@ -2437,7 +2494,7 @@ class ECFragGetter(object): parts_iter[0]) return (start_byte, end_byte, length, headers, part) except ChunkReadTimeout: - new_source, new_node = self._get_source_and_node() + new_source, new_node = self._dig_for_source_and_node() if new_source: self.app.error_occurred( node[0], _('Trying to read object during ' @@ -2472,8 +2529,6 @@ class ECFragGetter(object): nbytes -= len(chunk) except (ChunkReadTimeout, ShortReadError): exc_type, exc_value, exc_traceback = sys.exc_info() - if self.newest or self.server_type != 'Object': - raise try: self.fast_forward(self.bytes_used_from_backend) except (HTTPException, ValueError): @@ -2481,7 +2536,7 @@ class ECFragGetter(object): except RangeAlreadyComplete: break buf = b'' - new_source, new_node = self._get_source_and_node() + new_source, new_node = self._dig_for_source_and_node() if new_source: self.app.error_occurred( node[0], _('Trying to read object during ' @@ -2627,33 +2682,26 @@ class ECFragGetter(object): @property def last_status(self): - if self.statuses: - return self.statuses[-1] - else: - return None + return self.status or HTTP_INTERNAL_SERVER_ERROR @property def last_headers(self): if self.source_headers: - return HeaderKeyDict(self.source_headers[-1]) + return HeaderKeyDict(self.source_headers) else: - return None + return HeaderKeyDict() def _make_node_request(self, node, node_timeout, logger_thread_locals): self.app.logger.thread_locals = logger_thread_locals - if node in self.used_nodes: - return False req_headers = dict(self.backend_headers) - # a request may be specialised with specific backend headers - if self.header_provider: - req_headers.update(self.header_provider()) ip, port = get_ip_port(node, req_headers) + req_headers.update(self.header_provider()) start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect( ip, port, node['device'], - self.partition, self.req_method, self.path, + self.partition, 'GET', self.path, headers=req_headers, query_string=self.req_query_string) self.app.set_node_timing(node, time.time() - start_node_timing) @@ -2664,134 +2712,69 @@ class ECFragGetter(object): possible_source.swift_conn = conn except (Exception, Timeout): self.app.exception_occurred( - node, self.server_type, + node, 'Object', _('Trying to %(method)s %(path)s') % - {'method': self.req_method, 'path': self.req_path}) - return False + {'method': self.req.method, 'path': self.req.path}) + return None src_headers = dict( (k.lower(), v) for k, v in possible_source.getheaders()) - if self.is_good_source(possible_source): - # 404 if we know we don't have a synced copy - if not float(possible_source.getheader('X-PUT-Timestamp', 1)): - self.statuses.append(HTTP_NOT_FOUND) - self.reasons.append('') - self.bodies.append('') - self.source_headers.append([]) - close_swift_conn(possible_source) - else: - if self.used_source_etag and \ - self.used_source_etag != normalize_etag(src_headers.get( - 'x-object-sysmeta-ec-etag', - src_headers.get('etag', ''))): - self.statuses.append(HTTP_NOT_FOUND) - self.reasons.append('') - self.bodies.append('') - self.source_headers.append([]) - return False - # a possible source should only be added as a valid source - # if its timestamp is newer than previously found tombstones - ps_timestamp = Timestamp( - src_headers.get('x-backend-data-timestamp') or - src_headers.get('x-backend-timestamp') or - src_headers.get('x-put-timestamp') or - src_headers.get('x-timestamp') or 0) - if ps_timestamp >= self.latest_404_timestamp: - self.statuses.append(possible_source.status) - self.reasons.append(possible_source.reason) - self.bodies.append(None) - self.source_headers.append(possible_source.getheaders()) - self.sources.append((possible_source, node)) - if not self.newest: # one good source is enough - return True + if 'handoff_index' in node and \ + (is_server_error(possible_source.status) or + possible_source.status == HTTP_NOT_FOUND) and \ + not Timestamp(src_headers.get('x-backend-timestamp', 0)): + # throw out 5XX and 404s from handoff nodes unless the data is + # really on disk and had been DELETEd + return None + + self.status = possible_source.status + self.reason = possible_source.reason + self.source_headers = possible_source.getheaders() + if is_good_source(possible_source.status): + self.body = None + return possible_source else: - if 'handoff_index' in node and \ - (is_server_error(possible_source.status) or - possible_source.status == HTTP_NOT_FOUND) and \ - not Timestamp(src_headers.get('x-backend-timestamp', 0)): - # throw out 5XX and 404s from handoff nodes unless the data is - # really on disk and had been DELETEd - return False - self.statuses.append(possible_source.status) - self.reasons.append(possible_source.reason) - self.bodies.append(possible_source.read()) - self.source_headers.append(possible_source.getheaders()) + self.body = possible_source.read() - # if 404, record the timestamp. If a good source shows up, its - # timestamp will be compared to the latest 404. - # For now checking only on objects, but future work could include - # the same check for account and containers. See lp 1560574. - if self.server_type == 'Object' and \ - possible_source.status == HTTP_NOT_FOUND: - hdrs = HeaderKeyDict(possible_source.getheaders()) - ts = Timestamp(hdrs.get('X-Backend-Timestamp', 0)) - if ts > self.latest_404_timestamp: - self.latest_404_timestamp = ts if possible_source.status == HTTP_INSUFFICIENT_STORAGE: self.app.error_limit(node, _('ERROR Insufficient Storage')) elif is_server_error(possible_source.status): self.app.error_occurred( node, _('ERROR %(status)d %(body)s ' - 'From %(type)s Server') % + 'From Object Server') % {'status': possible_source.status, - 'body': self.bodies[-1][:1024], - 'type': self.server_type}) - return False + 'body': self.body[:1024]}) + return None - def _get_source_and_node(self): - self.statuses = [] - self.reasons = [] - self.bodies = [] - self.source_headers = [] - self.sources = [] + @property + def source_and_node_iter(self): + if not hasattr(self, '_source_and_node_iter'): + self._source_and_node_iter = self._source_and_node_gen() + return self._source_and_node_iter - nodes = GreenthreadSafeIterator(self.node_iter) + def _source_and_node_gen(self): + self.status = self.reason = self.body = self.source_headers = None + for node in self.node_iter: + source = self._make_node_request( + node, self.app.recoverable_node_timeout, + self.app.logger.thread_locals) - node_timeout = self.app.node_timeout - if self.server_type == 'Object' and not self.newest: - node_timeout = self.app.recoverable_node_timeout + if source: + self.node = node + yield source, node + else: + yield None, None + self.status = self.reason = self.body = self.source_headers = None - pile = GreenAsyncPile(self.concurrency) - - for node in nodes: - pile.spawn(self._make_node_request, node, node_timeout, - self.app.logger.thread_locals) - _timeout = self.app.concurrency_timeout \ - if pile.inflight < self.concurrency else None - if pile.waitfirst(_timeout): - break - else: - # ran out of nodes, see if any stragglers will finish - any(pile) - - # this helps weed out any sucess status that were found before a 404 - # and added to the list in the case of x-newest. - if self.sources: - self.sources = [s for s in self.sources - if source_key(s[0]) >= self.latest_404_timestamp] - - if self.sources: - self.sources.sort(key=lambda s: source_key(s[0])) - source, node = self.sources.pop() - for src, _junk in self.sources: - close_swift_conn(src) - self.used_nodes.append(node) - src_headers = dict( - (k.lower(), v) for k, v in - source.getheaders()) - - # Save off the source etag so that, if we lose the connection - # and have to resume from a different node, we can be sure that - # we have the same object (replication) or a fragment archive - # from the same object (EC). Otherwise, if the cluster has two - # versions of the same object, we might end up switching between - # old and new mid-stream and giving garbage to the client. - self.used_source_etag = normalize_etag(src_headers.get( - 'x-object-sysmeta-ec-etag', src_headers.get('etag', ''))) - self.node = node - return source, node + def _dig_for_source_and_node(self): + # capture last used etag before continuation + used_etag = self.last_headers.get('X-Object-Sysmeta-EC-ETag') + for source, node in self.source_and_node_iter: + if source and is_good_source(source.status) and \ + source.getheader('X-Object-Sysmeta-EC-ETag') == used_etag: + return source, node return None, None @@ -2805,11 +2788,9 @@ class ECObjectController(BaseObjectController): backend_headers = self.generate_request_headers( req, additional=req.headers) - getter = ECFragGetter(self.app, req, 'Object', node_iter, - partition, req.swift_entity_path, - backend_headers, - client_chunk_size=policy.fragment_size, - newest=False, header_provider=header_provider) + getter = ECFragGetter(self.app, req, node_iter, partition, + policy, req.swift_entity_path, backend_headers, + header_provider=header_provider) return (getter, getter.response_parts_iter(req)) def _convert_range(self, req, policy): @@ -2864,6 +2845,25 @@ class ECObjectController(BaseObjectController): for s, e in new_ranges) return range_specs + def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy, + buckets, feeder_q): + while True: + try: + feeder_q.get(timeout=self.app.concurrency_timeout) + except Empty: + if safe_iter.unsafe_iter.primaries_left: + # this will run async, if it ends up taking the last + # primary we won't find out until the next pass + pile.spawn(self._fragment_GET_request, + req, safe_iter, partition, + policy, buckets.get_extra_headers) + else: + # ran out of primaries + break + else: + # got a stop + break + def _get_or_head_response(self, req, node_iter, partition, policy): update_etag_is_at_header(req, "X-Object-Sysmeta-Ec-Etag") @@ -2887,27 +2887,24 @@ class ECObjectController(BaseObjectController): safe_iter = GreenthreadSafeIterator(node_iter) - # Sending the request concurrently to all nodes, and responding - # with the first response isn't something useful for EC as all - # nodes contain different fragments. Also EC has implemented it's - # own specific implementation of concurrent gets to ec_ndata nodes. - # So we don't need to worry about plumbing and sending a - # concurrency value to ECFragGetter. - with ContextPool(policy.ec_ndata) as pool: + ec_request_count = policy.ec_ndata + \ + self.app.concurrent_ec_extra_requests + with ContextPool(ec_request_count) as pool: pile = GreenAsyncPile(pool) buckets = ECGetResponseCollection(policy) node_iter.set_node_provider(buckets.provide_alternate_node) - # include what may well be an empty X-Backend-Fragment-Preferences - # header from the buckets.get_extra_headers to let the object - # server know that it is ok to return non-durable fragments - for _junk in range(policy.ec_ndata): + + for node_count in range(ec_request_count): pile.spawn(self._fragment_GET_request, req, safe_iter, partition, policy, buckets.get_extra_headers) - bad_bucket = ECGetResponseBucket(policy, None) - bad_bucket.set_durable() - best_bucket = None + feeder_q = None + if self.app.concurrent_gets: + feeder_q = Queue() + pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req, + partition, policy, buckets, feeder_q) + extra_requests = 0 # max_extra_requests is an arbitrary hard limit for spawning extra # getters in case some unforeseen scenario, or a misbehaving object @@ -2917,52 +2914,33 @@ class ECObjectController(BaseObjectController): # be limit at most 2 * replicas. max_extra_requests = ( (policy.object_ring.replica_count * 2) - policy.ec_ndata) - for get, parts_iter in pile: - if get.last_status is None: - # We may have spawned getters that find the node iterator - # has been exhausted. Ignore them. - # TODO: turns out that node_iter.nodes_left can bottom - # out at >0 when number of devs in ring is < 2* replicas, - # which definitely happens in tests and results in status - # of None. We should fix that but keep this guard because - # there is also a race between testing nodes_left/spawning - # a getter and an existing getter calling next(node_iter). - continue try: - if is_success(get.last_status): - # 2xx responses are managed by a response collection - buckets.add_response(get, parts_iter) - else: - # all other responses are lumped into a single bucket - bad_bucket.add_response(get, parts_iter) + buckets.add_response(get, parts_iter) except ValueError as err: self.app.logger.error( _("Problem with fragment response: %s"), err) - shortfall = bad_bucket.shortfall best_bucket = buckets.best_bucket - if best_bucket: - shortfall = best_bucket.shortfall - if not best_bucket.durable and shortfall <= 0: - # be willing to go a *little* deeper, slowly - shortfall = 1 - shortfall = min(shortfall, bad_bucket.shortfall) - if (extra_requests < max_extra_requests and - shortfall > pile._pending and - (node_iter.nodes_left > 0 or - buckets.has_alternate_node())): - # we need more matching responses to reach ec_ndata - # than we have pending gets, as long as we still have - # nodes in node_iter we can spawn another + if best_bucket.durable and best_bucket.shortfall <= 0: + # good enough! + break + requests_available = extra_requests < max_extra_requests and ( + node_iter.nodes_left > 0 or buckets.has_alternate_node()) + bad_resp = not is_good_source(get.last_status) + if requests_available and ( + buckets.shortfall > pile._pending or bad_resp): extra_requests += 1 - pile.spawn(self._fragment_GET_request, req, - safe_iter, partition, policy, - buckets.get_extra_headers) + pile.spawn(self._fragment_GET_request, + req, safe_iter, partition, + policy, buckets.get_extra_headers) + if feeder_q: + feeder_q.put('stop') # Put this back, since we *may* need it for kickoff()/_fix_response() # (but note that _fix_ranges() may also pop it back off before then) req.range = orig_range - if best_bucket and best_bucket.shortfall <= 0 and best_bucket.durable: + best_bucket = buckets.best_bucket + if best_bucket.shortfall <= 0 and best_bucket.durable: # headers can come from any of the getters resp_headers = best_bucket.headers resp_headers.pop('Content-Range', None) @@ -2975,8 +2953,7 @@ class ECObjectController(BaseObjectController): app_iter = ECAppIter( req.swift_entity_path, policy, - [parts_iter for - _getter, parts_iter in best_bucket.get_responses()], + [p_iter for _getter, p_iter in best_bucket.get_responses()], range_specs, fa_length, obj_length, self.app.logger) resp = Response( @@ -3002,25 +2979,28 @@ class ECObjectController(BaseObjectController): reasons = [] bodies = [] headers = [] - for getter, _parts_iter in bad_bucket.get_responses(): - if best_bucket and best_bucket.durable: - bad_resp_headers = HeaderKeyDict(getter.last_headers) - t_data_file = bad_resp_headers.get( - 'X-Backend-Data-Timestamp') - t_obj = bad_resp_headers.get( - 'X-Backend-Timestamp', - bad_resp_headers.get('X-Timestamp')) - bad_ts = Timestamp(t_data_file or t_obj or '0') - if bad_ts <= Timestamp(best_bucket.timestamp_str): - # We have reason to believe there's still good data - # out there, it's just currently unavailable - continue - statuses.extend(getter.statuses) - reasons.extend(getter.reasons) - bodies.extend(getter.bodies) - headers.extend(getter.source_headers) + for status, bad_bucket in buckets.bad_buckets.items(): + for getter, _parts_iter in bad_bucket.get_responses(): + if best_bucket.durable: + bad_resp_headers = getter.last_headers + t_data_file = bad_resp_headers.get( + 'X-Backend-Data-Timestamp') + t_obj = bad_resp_headers.get( + 'X-Backend-Timestamp', + bad_resp_headers.get('X-Timestamp')) + bad_ts = Timestamp(t_data_file or t_obj or '0') + if bad_ts <= best_bucket.timestamp: + # We have reason to believe there's still good data + # out there, it's just currently unavailable + continue + if getter.status: + statuses.append(getter.status) + reasons.append(getter.reason) + bodies.append(getter.body) + headers.append(getter.source_headers) - if not statuses and best_bucket and not best_bucket.durable: + if not statuses and is_success(best_bucket.status) and \ + not best_bucket.durable: # pretend that non-durable bucket was 404s statuses.append(404) reasons.append('404 Not Found') diff --git a/swift/proxy/server.py b/swift/proxy/server.py index b130852413..1a0928f692 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -263,6 +263,8 @@ class Application(object): self.concurrent_gets = config_true_value(conf.get('concurrent_gets')) self.concurrency_timeout = float(conf.get('concurrency_timeout', self.conn_timeout)) + self.concurrent_ec_extra_requests = int( + conf.get('concurrent_ec_extra_requests', 0)) value = conf.get('request_node_count', '2 * replicas').lower().split() if len(value) == 1: rnc_value = int(value[0]) diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 6464dbd6df..fe0d877a5b 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -21,29 +21,27 @@ import mock import six +from swift.proxy import server as proxy_server from swift.proxy.controllers.base import headers_to_container_info, \ headers_to_account_info, headers_to_object_info, get_container_info, \ get_cache_key, get_account_info, get_info, get_object_info, \ Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \ - set_info_cache + set_info_cache, NodeIter from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \ bytes_to_wsgi from swift.common import exceptions -from swift.common.utils import split_path, ShardRange, Timestamp +from swift.common.utils import split_path, ShardRange, Timestamp, \ + GreenthreadSafeIterator, GreenAsyncPile from swift.common.header_key_dict import HeaderKeyDict from swift.common.http import is_success from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection from test.unit import ( - fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, FakeLogger, - make_timestamp_iter, - mocked_http_conn) -from swift.proxy import server as proxy_server + fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, + make_timestamp_iter, mocked_http_conn, patch_policies, debug_logger) from swift.common.request_helpers import ( get_sys_meta_prefix, get_object_transient_sysmeta ) -from test.unit import patch_policies - class FakeResponse(object): @@ -179,13 +177,22 @@ class FakeCache(FakeMemcache): return self.stub or self.store.get(key) -@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) -class TestFuncs(unittest.TestCase): +class BaseTest(unittest.TestCase): + def setUp(self): - self.app = proxy_server.Application(None, - account_ring=FakeRing(), - container_ring=FakeRing(), - logger=FakeLogger()) + self.logger = debug_logger() + self.cache = FakeCache() + self.conf = {} + self.account_ring = FakeRing() + self.container_ring = FakeRing() + self.app = proxy_server.Application(self.conf, + logger=self.logger, + account_ring=self.account_ring, + container_ring=self.container_ring) + + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) +class TestFuncs(BaseTest): def test_get_info_zero_recheck(self): mock_cache = mock.Mock() @@ -1325,3 +1332,76 @@ class TestFuncs(unittest.TestCase): self.assertIn('Failed to get container listing', warning_lines[0]) self.assertIn('/a/c', warning_lines[0]) self.assertFalse(warning_lines[1:]) + + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) +class TestNodeIter(BaseTest): + + def test_iter_default_fake_ring(self): + for ring in (self.account_ring, self.container_ring): + self.assertEqual(ring.replica_count, 3.0) + node_iter = NodeIter(self.app, ring, 0) + self.assertEqual(6, node_iter.nodes_left) + self.assertEqual(3, node_iter.primaries_left) + count = 0 + for node in node_iter: + count += 1 + self.assertEqual(count, 3) + self.assertEqual(0, node_iter.primaries_left) + # default fake_ring has NO handoffs, so nodes_left is kind of a lie + self.assertEqual(3, node_iter.nodes_left) + + def test_iter_with_handoffs(self): + ring = FakeRing(replicas=3, max_more_nodes=20) # handoffs available + policy = StoragePolicy(0, 'zero', object_ring=ring) + node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy) + self.assertEqual(6, node_iter.nodes_left) + self.assertEqual(3, node_iter.primaries_left) + primary_indexes = set() + handoff_indexes = [] + count = 0 + for node in node_iter: + if 'index' in node: + primary_indexes.add(node['index']) + else: + handoff_indexes.append(node['handoff_index']) + count += 1 + self.assertEqual(count, 6) + self.assertEqual(0, node_iter.primaries_left) + self.assertEqual(0, node_iter.nodes_left) + self.assertEqual({0, 1, 2}, primary_indexes) + self.assertEqual([0, 1, 2], handoff_indexes) + + def test_multi_iteration(self): + ring = FakeRing(replicas=8, max_more_nodes=20) + policy = StoragePolicy(0, 'ec', object_ring=ring) + + # sanity + node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy) + self.assertEqual(16, len([n for n in node_iter])) + + node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy) + self.assertEqual(16, node_iter.nodes_left) + self.assertEqual(8, node_iter.primaries_left) + pile = GreenAsyncPile(5) + + def eat_node(node_iter): + return next(node_iter) + + safe_iter = GreenthreadSafeIterator(node_iter) + for i in range(5): + pile.spawn(eat_node, safe_iter) + + nodes = [] + for node in pile: + nodes.append(node) + + primary_indexes = {n['index'] for n in nodes} + self.assertEqual(5, len(primary_indexes)) + self.assertEqual(3, node_iter.primaries_left) + + # it's problematic we don't decrement nodes_left until we resume + self.assertEqual(12, node_iter.nodes_left) + for node in node_iter: + nodes.append(node) + self.assertEqual(17, len(nodes)) diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index c859954a5f..fb84af7fd9 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -26,7 +26,8 @@ import json from hashlib import md5 import mock -from eventlet import Timeout +from eventlet import Timeout, sleep +from eventlet.queue import Empty import six from six import StringIO @@ -2411,6 +2412,249 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 200) + def test_GET_no_response_error(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + with set_http_connect(): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def test_feed_remaining_primaries(self): + controller = self.controller_cls( + self.app, 'a', 'c', 'o') + safe_iter = utils.GreenthreadSafeIterator(self.app.iter_nodes( + self.policy.object_ring, 0, policy=self.policy)) + controller._fragment_GET_request = lambda *a, **k: next(safe_iter) + pile = utils.GreenAsyncPile(self.policy.ec_ndata) + for i in range(self.policy.ec_ndata): + pile.spawn(controller._fragment_GET_request) + req = swob.Request.blank('/v1/a/c/o') + + feeder_q = mock.MagicMock() + + def feeder_timeout(*a, **kw): + # simulate trampoline + sleep() + # timeout immediately + raise Empty + feeder_q.get.side_effect = feeder_timeout + controller.feed_remaining_primaries( + safe_iter, pile, req, 0, self.policy, mock.MagicMock(), feeder_q) + expected_call = mock.call(timeout=self.app.concurrency_timeout) + expected_num_calls = self.policy.ec_nparity + 1 + self.assertEqual(feeder_q.get.call_args_list, + [expected_call] * expected_num_calls) + + def test_GET_timeout(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + self.app.recoverable_node_timeout = 0.01 + codes = [FakeStatus(404, response_sleep=1.0)] + \ + [200] * (self.policy.ec_ndata) + with mocked_http_conn(*codes) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(self.policy.ec_ndata + 1, len(log.requests)) + + def test_GET_with_slow_primaries(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-743] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + ts = self.ts() + headers = [] + for i, body in enumerate(ec_archive_bodies): + headers.append({ + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(body), + 'X-Object-Sysmeta-Ec-Frag-Index': + self.policy.get_backend_index(i), + 'X-Backend-Timestamp': ts.internal, + 'X-Timestamp': ts.normal, + 'X-Backend-Durable-Timestamp': ts.internal, + 'X-Backend-Data-Timestamp': ts.internal, + }) + + req = swift.common.swob.Request.blank('/v1/a/c/o') + + self.app.concurrent_gets = True + self.app.concurrency_timeout = 0.01 + status_codes = ([ + FakeStatus(200, response_sleep=2.0), + ] * self.policy.ec_nparity) + ([ + FakeStatus(200), + ] * self.policy.ec_ndata) + self.assertEqual(len(status_codes), len(ec_archive_bodies)) + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log.requests), + self.policy.ec_n_unique_fragments) + + def test_GET_with_some_slow_primaries(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-289] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + ts = self.ts() + headers = [] + for i, body in enumerate(ec_archive_bodies): + headers.append({ + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(body), + 'X-Object-Sysmeta-Ec-Frag-Index': + self.policy.get_backend_index(i), + 'X-Backend-Timestamp': ts.internal, + 'X-Timestamp': ts.normal, + 'X-Backend-Durable-Timestamp': ts.internal, + 'X-Backend-Data-Timestamp': ts.internal, + }) + + req = swift.common.swob.Request.blank('/v1/a/c/o') + + self.app.concurrent_gets = True + self.app.concurrency_timeout = 0.01 + slow_count = self.policy.ec_nparity + status_codes = ([ + FakeStatus(200, response_sleep=2.0), + ] * slow_count) + ([ + FakeStatus(200), + ] * (self.policy.ec_ndata - slow_count)) + random.shuffle(status_codes) + status_codes.extend([ + FakeStatus(200), + ] * slow_count) + self.assertEqual(len(status_codes), len(ec_archive_bodies)) + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log.requests), + self.policy.ec_n_unique_fragments) + + def test_GET_with_slow_nodes_and_failures(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-289] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + ts = self.ts() + headers = [] + for i, body in enumerate(ec_archive_bodies): + headers.append({ + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(body), + 'X-Object-Sysmeta-Ec-Frag-Index': + self.policy.get_backend_index(i), + 'X-Backend-Timestamp': ts.internal, + 'X-Timestamp': ts.normal, + 'X-Backend-Durable-Timestamp': ts.internal, + 'X-Backend-Data-Timestamp': ts.internal, + }) + + req = swift.common.swob.Request.blank('/v1/a/c/o') + + self.app.concurrent_gets = True + self.app.concurrency_timeout = 0.01 + unused_resp = [ + FakeStatus(200, response_sleep=2.0), + FakeStatus(200, response_sleep=2.0), + 500, + 416, + ] + self.assertEqual(len(unused_resp), self.policy.ec_nparity) + status_codes = ( + [200] * (self.policy.ec_ndata - 4)) + unused_resp + self.assertEqual(len(status_codes), self.policy.ec_ndata) + # random.shuffle(status_codes) + # make up for the failures + status_codes.extend([200] * self.policy.ec_nparity) + self.assertEqual(len(status_codes), len(ec_archive_bodies)) + bodies_with_errors = [] + for code, body in zip(status_codes, ec_archive_bodies): + if code == 500: + bodies_with_errors.append('Kaboom') + elif code == 416: + bodies_with_errors.append('That Range is no.') + else: + bodies_with_errors.append(body) + with mocked_http_conn(*status_codes, body_iter=bodies_with_errors, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log.requests), + self.policy.ec_n_unique_fragments) + + def test_GET_with_one_slow_frag_lane(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-454] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + ts = self.ts() + headers = [] + for i, body in enumerate(ec_archive_bodies): + headers.append({ + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(body), + 'X-Object-Sysmeta-Ec-Frag-Index': + self.policy.get_backend_index(i), + 'X-Backend-Timestamp': ts.internal, + 'X-Timestamp': ts.normal, + 'X-Backend-Durable-Timestamp': ts.internal, + 'X-Backend-Data-Timestamp': ts.internal, + }) + + req = swift.common.swob.Request.blank('/v1/a/c/o') + + self.app.concurrent_gets = True + self.app.concurrency_timeout = 0.01 + status_codes = [ + FakeStatus(200, response_sleep=2.0), + ] + ([ + FakeStatus(200), + ] * (self.policy.ec_ndata - 1)) + random.shuffle(status_codes) + status_codes.extend([ + FakeStatus(200, response_sleep=2.0), + FakeStatus(200, response_sleep=2.0), + FakeStatus(200, response_sleep=2.0), + FakeStatus(200), + ]) + self.assertEqual(len(status_codes), len(ec_archive_bodies)) + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log.requests), + self.policy.ec_n_unique_fragments) + + def test_GET_with_concurrent_ec_extra_requests(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-454] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + ts = self.ts() + headers = [] + for i, body in enumerate(ec_archive_bodies): + headers.append({ + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(body), + 'X-Object-Sysmeta-Ec-Frag-Index': + self.policy.get_backend_index(i), + 'X-Backend-Timestamp': ts.internal, + 'X-Timestamp': ts.normal, + 'X-Backend-Durable-Timestamp': ts.internal, + 'X-Backend-Data-Timestamp': ts.internal, + }) + self.app.concurrent_ec_extra_requests = self.policy.ec_nparity - 1 + req = swift.common.swob.Request.blank('/v1/a/c/o') + status_codes = [200] * (self.policy.object_ring.replicas - 1) + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log.requests), + self.policy.object_ring.replicas - 1) + self.assertEqual(resp.body, test_data) + def test_GET_with_body(self): req = swift.common.swob.Request.blank('/v1/a/c/o') # turn a real body into fragments @@ -2576,7 +2820,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): fake_response = self._fake_ec_node_response(node_frags) req = swob.Request.blank('/v1/a/c/o') - with capture_http_requests(fake_response) as log: + with mock.patch('swift.proxy.server.shuffle', lambda n: n), \ + capture_http_requests(fake_response) as log: resp = req.get_response(self.app) self.assertEqual(resp.status_int, 200) @@ -2589,17 +2834,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] collected_responses[etag].add(index) - # because the primary nodes are shuffled, it's possible the proxy - # didn't even notice the missed overwrite frag - but it might have - self.assertLessEqual(len(log), self.policy.ec_ndata + 1) - self.assertLessEqual(len(collected_responses), 2) - - # ... regardless we should never need to fetch more than ec_ndata - # frags for any given etag - for etag, frags in collected_responses.items(): - self.assertLessEqual(len(frags), self.policy.ec_ndata, - 'collected %s frags for etag %s' % ( - len(frags), etag)) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + expected = { + obj1['etag']: 1, + obj2['etag']: self.policy.ec_ndata, + } + self.assertEqual(expected, { + e: len(f) for e, f in collected_responses.items()}) def test_GET_with_many_missed_overwrite_will_need_handoff(self): obj1 = self._make_ec_object_stub(pattern='obj1') @@ -2857,7 +3098,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): collected_indexes[fi].append(conn) self.assertEqual(len(collected_indexes), 7) - def test_GET_with_mixed_nondurable_frags_and_no_quorum_will_503(self): + def test_GET_with_mixed_nondurable_frags_and_will_404(self): # all nodes have a frag but there is no one set that reaches quorum, # which means there is no backend 404 response, but proxy should still # return 404 rather than 503 @@ -2919,10 +3160,72 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): collected_etags) self.assertEqual({200}, collected_status) - def test_GET_with_mixed_frags_and_no_quorum_will_503(self): + def test_GET_with_mixed_durable_and_nondurable_frags_will_503(self): # all nodes have a frag but there is no one set that reaches quorum, - # but since they're all marked durable (so we *should* be able to - # reconstruct), proxy will 503 + # but since one is marked durable we *should* be able to reconstruct, + # so proxy should 503 + obj1 = self._make_ec_object_stub(pattern='obj1') + obj2 = self._make_ec_object_stub(pattern='obj2') + obj3 = self._make_ec_object_stub(pattern='obj3') + obj4 = self._make_ec_object_stub(pattern='obj4') + + node_frags = [ + {'obj': obj1, 'frag': 0, 'durable': False}, + {'obj': obj2, 'frag': 0, 'durable': False}, + {'obj': obj3, 'frag': 0, 'durable': False}, + {'obj': obj1, 'frag': 1, 'durable': False}, + {'obj': obj2, 'frag': 1, 'durable': False}, + {'obj': obj3, 'frag': 1, 'durable': False}, + {'obj': obj1, 'frag': 2, 'durable': False}, + {'obj': obj2, 'frag': 2, 'durable': False}, + {'obj': obj3, 'frag': 2, 'durable': False}, + {'obj': obj1, 'frag': 3, 'durable': False}, + {'obj': obj2, 'frag': 3, 'durable': False}, + {'obj': obj3, 'frag': 3, 'durable': False}, + {'obj': obj1, 'frag': 4, 'durable': False}, + {'obj': obj2, 'frag': 4, 'durable': False}, + {'obj': obj3, 'frag': 4, 'durable': False}, + {'obj': obj1, 'frag': 5, 'durable': False}, + {'obj': obj2, 'frag': 5, 'durable': False}, + {'obj': obj3, 'frag': 5, 'durable': False}, + {'obj': obj1, 'frag': 6, 'durable': False}, + {'obj': obj2, 'frag': 6, 'durable': False}, + {'obj': obj3, 'frag': 6, 'durable': False}, + {'obj': obj1, 'frag': 7, 'durable': False}, + {'obj': obj2, 'frag': 7, 'durable': False}, + {'obj': obj3, 'frag': 7}, + {'obj': obj1, 'frag': 8, 'durable': False}, + {'obj': obj2, 'frag': 8, 'durable': False}, + {'obj': obj3, 'frag': 8, 'durable': False}, + {'obj': obj4, 'frag': 8, 'durable': False}, + ] + + fake_response = self._fake_ec_node_response(node_frags) + + req = swob.Request.blank('/v1/a/c/o') + with capture_http_requests(fake_response) as log: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 503) + + collected_etags = set() + collected_status = set() + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + collected_etags.add(etag) + collected_status.add(conn.resp.status) + + # default node_iter will exhaust at 2 * replicas + self.assertEqual(len(log), 2 * self.replicas()) + self.assertEqual( + {obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']}, + collected_etags) + self.assertEqual({200}, collected_status) + + def test_GET_with_mixed_durable_frags_and_no_quorum_will_503(self): + # all nodes have a frag but there is no one set that reaches quorum, + # and since at least one is marked durable we *should* be able to + # reconstruct, so proxy will 503 obj1 = self._make_ec_object_stub(pattern='obj1') obj2 = self._make_ec_object_stub(pattern='obj2') obj3 = self._make_ec_object_stub(pattern='obj3') @@ -3001,7 +3304,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): {'obj': obj1, 'frag': 11, 'durable': False}, # parity {'obj': obj1, 'frag': 12, 'durable': False}, # parity {'obj': obj1, 'frag': 13, 'durable': False}, # parity - ] # handoffs not used in this scenario + ] + [[]] * self.replicas() # handoffs all 404 fake_response = self._fake_ec_node_response(list(node_frags)) @@ -3013,9 +3316,11 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.headers['etag'], obj1['etag']) self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) - self.assertEqual(self.policy.ec_ndata, len(log)) + self.assertGreaterEqual(len(log), self.policy.ec_ndata) collected_durables = [] for conn in log: + if not conn.resp.headers.get('X-Backend-Data-Timestamp'): + continue if (conn.resp.headers.get('X-Backend-Durable-Timestamp') == conn.resp.headers.get('X-Backend-Data-Timestamp')): collected_durables.append(conn) @@ -3044,7 +3349,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): {'obj': obj1, 'frag': 11, 'durable': False}, # parity {'obj': obj1, 'frag': 12, 'durable': False}, # parity {'obj': obj1, 'frag': 13, 'durable': False}, # parity - ] # handoffs not used in this scenario + ] + [[]] * self.replicas() # handoffs all 404 fake_response = self._fake_ec_node_response(list(node_frags)) @@ -3058,6 +3363,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): collected_durables = [] for conn in log: + if not conn.resp.headers.get('X-Backend-Data-Timestamp'): + continue if (conn.resp.headers.get('X-Backend-Durable-Timestamp') == conn.resp.headers.get('X-Backend-Data-Timestamp')): collected_durables.append(conn) @@ -3231,6 +3538,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): # min: proxy will GET 10 non-durable obj1 frags and then 10 obj frags self.assertGreaterEqual(len(log), 2 * self.policy.ec_ndata) + def test_GET_with_missing_durables_and_older_obscured_durables(self): # scenario: obj3 has 14 frags but only 2 are durable and these are # obscured by two non-durable frags of obj1. There is also a single # non-durable frag of obj2. The proxy will need to do at least 10 @@ -3259,7 +3567,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): [{'obj': obj3, 'frag': 11, 'durable': False}], [{'obj': obj3, 'frag': 12, 'durable': False}], [{'obj': obj3, 'frag': 13, 'durable': False}], - ] + ] + [[]] * self.replicas() # handoffs 404 fake_response = self._fake_ec_node_response(list(node_frags)) @@ -3271,7 +3579,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.headers['etag'], obj3['etag']) self.assertEqual(md5(resp.body).hexdigest(), obj3['etag']) self.assertGreaterEqual(len(log), self.policy.ec_ndata + 1) - self.assertLessEqual(len(log), self.policy.ec_ndata + 4) + self.assertLessEqual(len(log), (self.policy.ec_ndata * 2) + 1) def test_GET_with_missing_durables_and_older_non_durables(self): # scenario: non-durable frags of newer obj1 obscure all frags @@ -3453,7 +3761,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): StubResponse(416, frag_index=4), StubResponse(416, frag_index=5), StubResponse(416, frag_index=6), - # sneak in bogus extra responses + # sneak a couple bogus extra responses StubResponse(404), StubResponse(206, frag_index=8), # and then just "enough" more 416's @@ -3471,8 +3779,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 416) - # ec_ndata responses that must agree, plus the bogus extras - self.assertEqual(len(log), self.policy.ec_ndata + 2) + # we're going to engage ndata primaries, plus the bogus extra + # self.assertEqual(len(log), self.policy.ec_ndata + 2) + self.assertEqual([c.resp.status for c in log], + ([416] * 7) + [404, 206] + ([416] * 3)) def test_GET_with_missing_and_range_unsatisifiable(self): responses = [ # not quite ec_ndata frags on primaries @@ -3700,7 +4010,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): status_codes, body_iter, headers = zip(*responses + [ (404, [b''], {}) for i in range( self.policy.object_ring.max_more_nodes)]) - with set_http_connect(*status_codes, body_iter=body_iter, + with mocked_http_conn(*status_codes, body_iter=body_iter, headers=headers): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 200) @@ -3708,8 +4018,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): # resume but won't be able to give us all the right bytes self.assertNotEqual(md5(resp.body).hexdigest(), etag) error_lines = self.logger.get_lines_for_level('error') - self.assertEqual(self.replicas(), len(error_lines)) nparity = self.policy.ec_nparity + self.assertGreater(len(error_lines), nparity) for line in error_lines[:nparity]: self.assertIn('retrying', line) for line in error_lines[nparity:]: @@ -3720,7 +4030,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): test_data = (b'test' * segment_size)[:-333] etag = md5(test_data).hexdigest() ec_archive_bodies = self._make_ec_archive_bodies(test_data) - headers = {'X-Object-Sysmeta-Ec-Etag': etag} + headers = { + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(test_data), + } self.app.recoverable_node_timeout = 0.05 # first one is slow responses = [(200, SlowBody(ec_archive_bodies[0], 0.1), @@ -3737,11 +4050,100 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): headers=headers): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 200) - self.assertTrue(md5(resp.body).hexdigest(), etag) + self.assertEqual(md5(resp.body).hexdigest(), etag) error_lines = self.logger.get_lines_for_level('error') self.assertEqual(1, len(error_lines)) self.assertIn('retrying', error_lines[0]) + def test_GET_read_timeout_resume_mixed_etag(self): + segment_size = self.policy.ec_segment_size + test_data2 = (b'blah1' * segment_size)[:-333] + test_data1 = (b'test' * segment_size)[:-333] + etag2 = md5(test_data2).hexdigest() + etag1 = md5(test_data1).hexdigest() + ec_archive_bodies2 = self._make_ec_archive_bodies(test_data2) + ec_archive_bodies1 = self._make_ec_archive_bodies(test_data1) + headers2 = {'X-Object-Sysmeta-Ec-Etag': etag2, + 'X-Object-Sysmeta-Ec-Content-Length': len(test_data2), + 'X-Backend-Timestamp': self.ts().internal} + headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1, + 'X-Object-Sysmeta-Ec-Content-Length': len(test_data1), + 'X-Backend-Timestamp': self.ts().internal} + responses = [ + # 404 + (404, [b''], {}), + # etag1 + (200, ec_archive_bodies1[1], self._add_frag_index(1, headers1)), + # 404 + (404, [b''], {}), + # etag1 + (200, SlowBody(ec_archive_bodies1[3], 0.1), self._add_frag_index( + 3, headers1)), + # etag2 + (200, ec_archive_bodies2[4], self._add_frag_index(4, headers2)), + # etag1 + (200, ec_archive_bodies1[5], self._add_frag_index(5, headers1)), + # etag2 + (200, ec_archive_bodies2[6], self._add_frag_index(6, headers2)), + # etag1 + (200, ec_archive_bodies1[7], self._add_frag_index(7, headers1)), + # etag2 + (200, ec_archive_bodies2[8], self._add_frag_index(8, headers2)), + # etag1 + (200, SlowBody(ec_archive_bodies1[9], 0.1), self._add_frag_index( + 9, headers1)), + # etag2 + (200, ec_archive_bodies2[10], self._add_frag_index(10, headers2)), + # etag1 + (200, ec_archive_bodies1[11], self._add_frag_index(11, headers1)), + # etag2 + (200, ec_archive_bodies2[12], self._add_frag_index(12, headers2)), + # 404 + (404, [b''], {}), + # handoffs start here + # etag2 + (200, ec_archive_bodies2[0], self._add_frag_index(0, headers2)), + # 404 + (404, [b''], {}), + # etag1 + (200, ec_archive_bodies1[2], self._add_frag_index(2, headers1)), + # 404 + (404, [b''], {}), + # etag1 + (200, ec_archive_bodies1[4], self._add_frag_index(4, headers1)), + # etag2 + (200, ec_archive_bodies2[1], self._add_frag_index(1, headers2)), + # etag1 + (200, ec_archive_bodies1[6], self._add_frag_index(6, headers1)), + # etag2 + (200, ec_archive_bodies2[7], self._add_frag_index(7, headers2)), + # etag1 + (200, ec_archive_bodies1[8], self._add_frag_index(8, headers1)), + # resume requests start here + # 404 + (404, [b''], {}), + # etag2 + (200, ec_archive_bodies2[3], self._add_frag_index(3, headers2)), + # 404 + (404, [b''], {}), + # etag1 + (200, ec_archive_bodies1[10], self._add_frag_index(10, headers1)), + # etag1 + (200, ec_archive_bodies1[12], self._add_frag_index(12, headers1)), + ] + self.app.recoverable_node_timeout = 0.01 + req = swob.Request.blank('/v1/a/c/o') + status_codes, body_iter, headers = zip(*responses) + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(md5(resp.body).hexdigest(), etag1) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(2, len(error_lines)) + for line in error_lines: + self.assertIn('retrying', line) + def test_fix_response_HEAD(self): headers = {'X-Object-Sysmeta-Ec-Content-Length': '10', 'X-Object-Sysmeta-Ec-Etag': 'foo'} @@ -3825,6 +4227,43 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.etag, body_etag) self.assertEqual(resp.headers['Accept-Ranges'], 'bytes') + def test_non_durable_ec_response_bucket(self): + ts = self.ts() + bucket = obj.ECGetResponseBucket(self.policy, ts) + self.assertEqual(bucket.shortfall, self.policy.ec_ndata) + for i in range(1, self.policy.ec_ndata - self.policy.ec_nparity + 1): + stub_getter = mock.MagicMock(last_status=200, last_headers={ + 'X-Backend-Timestamp': ts.internal, + 'X-Object-Sysmeta-Ec-Etag': 'the-etag', + 'X-Object-Sysmeta-Ec-Frag-Index': str(i), + }) + bucket.add_response(stub_getter, None) + self.assertEqual(bucket.shortfall, self.policy.ec_ndata - i) + self.assertEqual(bucket.shortfall, self.policy.ec_nparity) + self.assertFalse(bucket.durable) + expectations = ( + 4, # 7 + 4, # 8 + 4, # 9 + 4, # 10 + 3, # 11 + 2, # 12 + 1, # 13 + 1, # 14 + ) + for i, expected in zip(range( + self.policy.ec_ndata - self.policy.ec_nparity + 1, + self.policy.object_ring.replica_count + 1), expectations): + stub_getter = mock.MagicMock(last_status=200, last_headers={ + 'X-Backend-Timestamp': ts.internal, + 'X-Object-Sysmeta-Ec-Etag': 'the-etag', + 'X-Object-Sysmeta-Ec-Frag-Index': str(i), + }) + bucket.add_response(stub_getter, None) + msg = 'With %r resp, expected shortfall %s != %s' % ( + bucket.gets.keys(), expected, bucket.shortfall) + self.assertEqual(bucket.shortfall, expected, msg) + class TestECFunctions(unittest.TestCase): def test_chunk_transformer(self): @@ -3969,7 +4408,8 @@ class TestECDuplicationObjController( # the backend requests will stop at enough ec_ndata responses self.assertEqual( len(frags), self.policy.ec_ndata, - 'collected %s frags for etag %s' % (len(frags), etag)) + 'collected %s frags (expected %s) for etag %s' % ( + len(frags), self.policy.ec_ndata, etag)) # TODO: actually "frags" in node_frags is meaning "node_index" right now # in following tests. Reconsidering the name and semantics change needed. diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index fdb3b32283..035d852671 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -1337,6 +1337,20 @@ class TestProxyServerLoading(unittest.TestCase): self.assertEqual(app.post_quorum_timeout, 0.3) self.assertEqual(app.concurrency_timeout, 0.2) + def test_concurrent_ec_options(self): + conf = { + 'concurrent_gets': 'on', + 'concurrency_timeout': '0.5', + 'concurrent_ec_extra_requests': '4', + } + for policy in POLICIES: + policy.object_ring = FakeRing() + app = proxy_server.Application(conf, debug_logger(), + FakeRing(), FakeRing()) + self.assertEqual(app.concurrent_ec_extra_requests, 4) + self.assertEqual(app.concurrent_gets, True) + self.assertEqual(app.concurrency_timeout, 0.5) + def test_load_policy_rings(self): for policy in POLICIES: self.assertFalse(policy.object_ring) @@ -4687,12 +4701,21 @@ class TestReplicatedObjectController( object_ring.max_more_nodes = 0 def test_iter_nodes_calls_sort_nodes(self): - with mock.patch.object(self.app, 'sort_nodes') as sort_nodes: + called = [] + + def fake_sort_nodes(nodes, **kwargs): + # caller might mutate the list we return during iteration, we're + # interested in the value as of call time + called.append(mock.call(list(nodes), **kwargs)) + return nodes + with mock.patch.object(self.app, 'sort_nodes', + side_effect=fake_sort_nodes): object_ring = self.app.get_object_ring(None) for node in self.app.iter_nodes(object_ring, 0): pass - sort_nodes.assert_called_once_with( - object_ring.get_part_nodes(0), policy=None) + self.assertEqual(called, [ + mock.call(object_ring.get_part_nodes(0), policy=None) + ]) def test_iter_nodes_skips_error_limited(self): with mock.patch.object(self.app, 'sort_nodes',