diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 2912da3ea6..43ba6d695f 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -854,10 +854,10 @@ class ByteCountEnforcer(object): return chunk -class ResumingGetter(object): +class GetOrHeadHandler(object): def __init__(self, app, req, server_type, node_iter, partition, path, backend_headers, concurrency=1, client_chunk_size=None, - newest=None, header_provider=None): + newest=None): self.app = app self.node_iter = node_iter self.server_type = server_type @@ -871,7 +871,6 @@ class ResumingGetter(object): self.used_source_etag = '' self.concurrency = concurrency self.node = None - self.header_provider = header_provider self.latest_404_timestamp = Timestamp(0) # stuff from request @@ -1011,13 +1010,6 @@ class ResumingGetter(object): return True return is_success(src.status) or is_redirection(src.status) - def response_parts_iter(self, req): - source, node = self._get_source_and_node() - it = None - if source: - it = self._get_response_parts_iter(req, node, source) - return it - def _get_response_parts_iter(self, req, node, source): # Someday we can replace this [mess] with python 3's "nonlocal" source = [source] @@ -1261,10 +1253,6 @@ class ResumingGetter(object): if node in self.used_nodes: return False req_headers = dict(self.backend_headers) - # a request may be specialised with specific backend headers - if self.header_provider: - req_headers.update(self.header_provider()) - ip, port = get_ip_port(node, req_headers) start_node_timing = time.time() try: @@ -1300,9 +1288,8 @@ class ResumingGetter(object): close_swift_conn(possible_source) else: if self.used_source_etag and \ - self.used_source_etag != normalize_etag(src_headers.get( - 'x-object-sysmeta-ec-etag', - src_headers.get('etag', ''))): + self.used_source_etag != normalize_etag( + src_headers.get('etag', '')): self.statuses.append(HTTP_NOT_FOUND) self.reasons.append('') self.bodies.append('') @@ -1402,18 +1389,14 @@ class ResumingGetter(object): # Save off the source etag so that, if we lose the connection # and have to resume from a different node, we can be sure that - # we have the same object (replication) or a fragment archive - # from the same object (EC). Otherwise, if the cluster has two - # versions of the same object, we might end up switching between - # old and new mid-stream and giving garbage to the client. - self.used_source_etag = normalize_etag(src_headers.get( - 'x-object-sysmeta-ec-etag', src_headers.get('etag', ''))) + # we have the same object (replication). Otherwise, if the cluster + # has two versions of the same object, we might end up switching + # between old and new mid-stream and giving garbage to the client. + self.used_source_etag = normalize_etag(src_headers.get('etag', '')) self.node = node return source, node return None, None - -class GetOrHeadHandler(ResumingGetter): def _make_app_iter(self, req, node, source): """ Returns an iterator over the contents of the source (via its read diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index ceab334463..9f3d83b86f 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -24,6 +24,7 @@ # These shenanigans are to ensure all related objects can be garbage # collected. We've seen objects hang around forever otherwise. +import six from six.moves.urllib.parse import quote, unquote from six.moves import zip @@ -35,13 +36,14 @@ import time import math import random from hashlib import md5 -from swift import gettext_ as _ +import sys from greenlet import GreenletExit from eventlet import GreenPile, sleep from eventlet.queue import Queue from eventlet.timeout import Timeout +from swift import gettext_ as _ from swift.common.utils import ( clean_content_type, config_true_value, ContextPool, csv_append, GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout, @@ -54,18 +56,19 @@ from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout, ResponseTimeout, \ InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \ - PutterConnectError, ChunkReadError + PutterConnectError, ChunkReadError, RangeAlreadyComplete, ShortReadError from swift.common.header_key_dict import HeaderKeyDict from swift.common.http import ( is_informational, is_success, is_client_error, is_server_error, is_redirection, HTTP_CONTINUE, HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE, HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY, - HTTP_REQUESTED_RANGE_NOT_SATISFIABLE) + HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, HTTP_NOT_FOUND) from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, ECDriverError, PolicyError) from swift.proxy.controllers.base import Controller, delay_denial, \ - cors_validation, ResumingGetter, update_headers + cors_validation, update_headers, bytes_to_skip, close_swift_conn, \ + ByteCountEnforcer, source_key from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ @@ -73,7 +76,8 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError, \ normalize_etag from swift.common.request_helpers import update_etag_is_at_header, \ - resolve_etag_is_at_header, validate_internal_obj, get_ip_port + resolve_etag_is_at_header, validate_internal_obj, get_ip_port, \ + http_response_to_document_iters def check_content_type(req): @@ -2034,7 +2038,7 @@ class ECGetResponseBucket(object): associated with the same frag_index then only one is included. :return: a list of sources, each source being a tuple of form - (ResumingGetter, iter) + (ECFragGetter, iter) """ all_sources = [] for frag_index, sources in self.gets.items(): @@ -2072,7 +2076,7 @@ class ECGetResponseBucket(object): class ECGetResponseCollection(object): """ - Manages all successful EC GET responses gathered by ResumingGetters. + Manages all successful EC GET responses gathered by ECFragGetters. A response comprises a tuple of (, ). All responses having the same data timestamp are placed in an @@ -2103,7 +2107,7 @@ class ECGetResponseCollection(object): Add a response to the collection. :param get: An instance of - :class:`~swift.proxy.controllers.base.ResumingGetter` + :class:`~swift.proxy.controllers.obj.ECFragGetter` :param parts_iter: An iterator over response body parts :raises ValueError: if the response etag or status code values do not match any values previously received for the same timestamp @@ -2233,6 +2237,564 @@ class ECGetResponseCollection(object): return nodes.pop(0).copy() +class ECFragGetter(object): + def __init__(self, app, req, server_type, node_iter, partition, path, + backend_headers, concurrency=1, client_chunk_size=None, + newest=None, header_provider=None): + self.app = app + self.node_iter = node_iter + self.server_type = server_type + self.partition = partition + self.path = path + self.backend_headers = backend_headers + self.client_chunk_size = client_chunk_size + self.skip_bytes = 0 + self.bytes_used_from_backend = 0 + self.used_nodes = [] + self.used_source_etag = '' + self.concurrency = concurrency + self.node = None + self.header_provider = header_provider + self.latest_404_timestamp = Timestamp(0) + + # stuff from request + self.req_method = req.method + self.req_path = req.path + self.req_query_string = req.query_string + if newest is None: + self.newest = config_true_value(req.headers.get('x-newest', 'f')) + else: + self.newest = newest + + # populated when finding source + self.statuses = [] + self.reasons = [] + self.bodies = [] + self.source_headers = [] + self.sources = [] + + # populated from response headers + self.start_byte = self.end_byte = self.length = None + + def fast_forward(self, num_bytes): + """ + Will skip num_bytes into the current ranges. + + :params num_bytes: the number of bytes that have already been read on + this request. This will change the Range header + so that the next req will start where it left off. + + :raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes + > end of range + 1 + :raises RangeAlreadyComplete: if begin + num_bytes == end of range + 1 + """ + try: + req_range = Range(self.backend_headers.get('Range')) + except ValueError: + req_range = None + + if req_range: + begin, end = req_range.ranges[0] + if begin is None: + # this is a -50 range req (last 50 bytes of file) + end -= num_bytes + if end == 0: + # we sent out exactly the first range's worth of bytes, so + # we're done with it + raise RangeAlreadyComplete() + + if end < 0: + raise HTTPRequestedRangeNotSatisfiable() + + else: + begin += num_bytes + if end is not None and begin == end + 1: + # we sent out exactly the first range's worth of bytes, so + # we're done with it + raise RangeAlreadyComplete() + + if end is not None and begin > end: + raise HTTPRequestedRangeNotSatisfiable() + + req_range.ranges = [(begin, end)] + req_range.ranges[1:] + self.backend_headers['Range'] = str(req_range) + else: + self.backend_headers['Range'] = 'bytes=%d-' % num_bytes + + # Reset so if we need to do this more than once, we don't double-up + self.bytes_used_from_backend = 0 + + def pop_range(self): + """ + Remove the first byterange from our Range header. + + This is used after a byterange has been completely sent to the + client; this way, should we need to resume the download from another + object server, we do not re-fetch byteranges that the client already + has. + + If we have no Range header, this is a no-op. + """ + if 'Range' in self.backend_headers: + try: + req_range = Range(self.backend_headers['Range']) + except ValueError: + # there's a Range header, but it's garbage, so get rid of it + self.backend_headers.pop('Range') + return + begin, end = req_range.ranges.pop(0) + if len(req_range.ranges) > 0: + self.backend_headers['Range'] = str(req_range) + else: + self.backend_headers.pop('Range') + + def learn_size_from_content_range(self, start, end, length): + """ + If client_chunk_size is set, makes sure we yield things starting on + chunk boundaries based on the Content-Range header in the response. + + Sets our Range header's first byterange to the value learned from + the Content-Range header in the response; if we were given a + fully-specified range (e.g. "bytes=123-456"), this is a no-op. + + If we were given a half-specified range (e.g. "bytes=123-" or + "bytes=-456"), then this changes the Range header to a + semantically-equivalent one *and* it lets us resume on a proper + boundary instead of just in the middle of a piece somewhere. + """ + if length == 0: + return + + if self.client_chunk_size: + self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) + + if 'Range' in self.backend_headers: + try: + req_range = Range(self.backend_headers['Range']) + new_ranges = [(start, end)] + req_range.ranges[1:] + except ValueError: + new_ranges = [(start, end)] + else: + new_ranges = [(start, end)] + + self.backend_headers['Range'] = ( + "bytes=" + (",".join("%s-%s" % (s if s is not None else '', + e if e is not None else '') + for s, e in new_ranges))) + + def is_good_source(self, src): + """ + Indicates whether or not the request made to the backend found + what it was looking for. + + :param src: the response from the backend + :returns: True if found, False if not + """ + if self.server_type == 'Object' and src.status == 416: + return True + return is_success(src.status) or is_redirection(src.status) + + def response_parts_iter(self, req): + source, node = self._get_source_and_node() + it = None + if source: + it = self._get_response_parts_iter(req, node, source) + return it + + def _get_response_parts_iter(self, req, node, source): + # Someday we can replace this [mess] with python 3's "nonlocal" + source = [source] + node = [node] + + try: + client_chunk_size = self.client_chunk_size + node_timeout = self.app.node_timeout + if self.server_type == 'Object': + node_timeout = self.app.recoverable_node_timeout + + # This is safe; it sets up a generator but does not call next() + # on it, so no IO is performed. + parts_iter = [ + http_response_to_document_iters( + source[0], read_chunk_size=self.app.object_chunk_size)] + + def get_next_doc_part(): + while True: + try: + # This call to next() performs IO when we have a + # multipart/byteranges response; it reads the MIME + # boundary and part headers. + # + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and either just returns source or + # raises StopIteration. + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + # if StopIteration is raised, it escapes and is + # handled elsewhere + start_byte, end_byte, length, headers, part = next( + parts_iter[0]) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._get_source_and_node() + if new_source: + self.app.error_occurred( + node[0], _('Trying to read object during ' + 'GET (retrying)')) + # Close-out the connection as best as possible. + if getattr(source[0], 'swift_conn', None): + close_swift_conn(source[0]) + source[0] = new_source + node[0] = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + parts_iter[0] = http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + else: + raise StopIteration() + + def iter_bytes_from_response_part(part_file, nbytes): + nchunks = 0 + buf = b'' + part_file = ByteCountEnforcer(part_file, nbytes) + while True: + try: + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + chunk = part_file.read(self.app.object_chunk_size) + nchunks += 1 + # NB: this append must be *inside* the context + # manager for test.unit.SlowBody to do its thing + buf += chunk + if nbytes is not None: + nbytes -= len(chunk) + except (ChunkReadTimeout, ShortReadError): + exc_type, exc_value, exc_traceback = sys.exc_info() + if self.newest or self.server_type != 'Object': + raise + try: + self.fast_forward(self.bytes_used_from_backend) + except (HTTPException, ValueError): + six.reraise(exc_type, exc_value, exc_traceback) + except RangeAlreadyComplete: + break + buf = b'' + new_source, new_node = self._get_source_and_node() + if new_source: + self.app.error_occurred( + node[0], _('Trying to read object during ' + 'GET (retrying)')) + # Close-out the connection as best as possible. + if getattr(source[0], 'swift_conn', None): + close_swift_conn(source[0]) + source[0] = new_source + node[0] = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + parts_iter[0] = http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + + try: + _junk, _junk, _junk, _junk, part_file = \ + get_next_doc_part() + except StopIteration: + # Tried to find a new node from which to + # finish the GET, but failed. There's + # nothing more we can do here. + six.reraise(exc_type, exc_value, exc_traceback) + part_file = ByteCountEnforcer(part_file, nbytes) + else: + six.reraise(exc_type, exc_value, exc_traceback) + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + self.bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 + else: + self.skip_bytes -= len(buf) + self.bytes_used_from_backend += len(buf) + buf = b'' + + if not chunk: + if buf: + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(buf) + yield buf + buf = b'' + break + + if client_chunk_size is not None: + while len(buf) >= client_chunk_size: + client_chunk = buf[:client_chunk_size] + buf = buf[client_chunk_size:] + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += \ + len(client_chunk) + yield client_chunk + else: + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(buf) + yield buf + buf = b'' + + # This is for fairness; if the network is outpacing + # the CPU, we'll always be able to read and write + # data without encountering an EWOULDBLOCK, and so + # eventlet will not switch greenthreads on its own. + # We do it manually so that clients don't starve. + # + # The number 5 here was chosen by making stuff up. + # It's not every single chunk, but it's not too big + # either, so it seemed like it would probably be an + # okay choice. + # + # Note that we may trampoline to other greenthreads + # more often than once every 5 chunks, depending on + # how blocking our network IO is; the explicit sleep + # here simply provides a lower bound on the rate of + # trampolining. + if nchunks % 5 == 0: + sleep() + + part_iter = None + try: + while True: + start_byte, end_byte, length, headers, part = \ + get_next_doc_part() + # note: learn_size_from_content_range() sets + # self.skip_bytes + self.learn_size_from_content_range( + start_byte, end_byte, length) + self.bytes_used_from_backend = 0 + # not length; that refers to the whole object, so is the + # wrong value to use for GET-range responses + byte_count = ((end_byte - start_byte + 1) - self.skip_bytes + if (end_byte is not None + and start_byte is not None) + else None) + part_iter = iter_bytes_from_response_part(part, byte_count) + yield {'start_byte': start_byte, 'end_byte': end_byte, + 'entity_length': length, 'headers': headers, + 'part_iter': part_iter} + self.pop_range() + except StopIteration: + req.environ['swift.non_client_disconnect'] = True + finally: + if part_iter: + part_iter.close() + + except ChunkReadTimeout: + self.app.exception_occurred(node[0], _('Object'), + _('Trying to read during GET')) + raise + except ChunkWriteTimeout: + self.app.logger.warning( + _('Client did not read from proxy within %ss') % + self.app.client_timeout) + self.app.logger.increment('client_timeouts') + except GeneratorExit: + warn = True + req_range = self.backend_headers['Range'] + if req_range: + req_range = Range(req_range) + if len(req_range.ranges) == 1: + begin, end = req_range.ranges[0] + if end is not None and begin is not None: + if end - begin + 1 == self.bytes_used_from_backend: + warn = False + if not req.environ.get('swift.non_client_disconnect') and warn: + self.app.logger.warning('Client disconnected on read of %r', + self.path) + raise + except Exception: + self.app.logger.exception(_('Trying to send to client')) + raise + finally: + # Close-out the connection as best as possible. + if getattr(source[0], 'swift_conn', None): + close_swift_conn(source[0]) + + @property + def last_status(self): + if self.statuses: + return self.statuses[-1] + else: + return None + + @property + def last_headers(self): + if self.source_headers: + return HeaderKeyDict(self.source_headers[-1]) + else: + return None + + def _make_node_request(self, node, node_timeout, logger_thread_locals): + self.app.logger.thread_locals = logger_thread_locals + if node in self.used_nodes: + return False + req_headers = dict(self.backend_headers) + # a request may be specialised with specific backend headers + if self.header_provider: + req_headers.update(self.header_provider()) + ip, port = get_ip_port(node, req_headers) + start_node_timing = time.time() + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect( + ip, port, node['device'], + self.partition, self.req_method, self.path, + headers=req_headers, + query_string=self.req_query_string) + self.app.set_node_timing(node, time.time() - start_node_timing) + + with Timeout(node_timeout): + possible_source = conn.getresponse() + # See NOTE: swift_conn at top of file about this. + possible_source.swift_conn = conn + except (Exception, Timeout): + self.app.exception_occurred( + node, self.server_type, + _('Trying to %(method)s %(path)s') % + {'method': self.req_method, 'path': self.req_path}) + return False + + src_headers = dict( + (k.lower(), v) for k, v in + possible_source.getheaders()) + if self.is_good_source(possible_source): + # 404 if we know we don't have a synced copy + if not float(possible_source.getheader('X-PUT-Timestamp', 1)): + self.statuses.append(HTTP_NOT_FOUND) + self.reasons.append('') + self.bodies.append('') + self.source_headers.append([]) + close_swift_conn(possible_source) + else: + if self.used_source_etag and \ + self.used_source_etag != normalize_etag(src_headers.get( + 'x-object-sysmeta-ec-etag', + src_headers.get('etag', ''))): + self.statuses.append(HTTP_NOT_FOUND) + self.reasons.append('') + self.bodies.append('') + self.source_headers.append([]) + return False + + # a possible source should only be added as a valid source + # if its timestamp is newer than previously found tombstones + ps_timestamp = Timestamp( + src_headers.get('x-backend-data-timestamp') or + src_headers.get('x-backend-timestamp') or + src_headers.get('x-put-timestamp') or + src_headers.get('x-timestamp') or 0) + if ps_timestamp >= self.latest_404_timestamp: + self.statuses.append(possible_source.status) + self.reasons.append(possible_source.reason) + self.bodies.append(None) + self.source_headers.append(possible_source.getheaders()) + self.sources.append((possible_source, node)) + if not self.newest: # one good source is enough + return True + else: + if 'handoff_index' in node and \ + (is_server_error(possible_source.status) or + possible_source.status == HTTP_NOT_FOUND) and \ + not Timestamp(src_headers.get('x-backend-timestamp', 0)): + # throw out 5XX and 404s from handoff nodes unless the data is + # really on disk and had been DELETEd + return False + self.statuses.append(possible_source.status) + self.reasons.append(possible_source.reason) + self.bodies.append(possible_source.read()) + self.source_headers.append(possible_source.getheaders()) + + # if 404, record the timestamp. If a good source shows up, its + # timestamp will be compared to the latest 404. + # For now checking only on objects, but future work could include + # the same check for account and containers. See lp 1560574. + if self.server_type == 'Object' and \ + possible_source.status == HTTP_NOT_FOUND: + hdrs = HeaderKeyDict(possible_source.getheaders()) + ts = Timestamp(hdrs.get('X-Backend-Timestamp', 0)) + if ts > self.latest_404_timestamp: + self.latest_404_timestamp = ts + if possible_source.status == HTTP_INSUFFICIENT_STORAGE: + self.app.error_limit(node, _('ERROR Insufficient Storage')) + elif is_server_error(possible_source.status): + self.app.error_occurred( + node, _('ERROR %(status)d %(body)s ' + 'From %(type)s Server') % + {'status': possible_source.status, + 'body': self.bodies[-1][:1024], + 'type': self.server_type}) + return False + + def _get_source_and_node(self): + self.statuses = [] + self.reasons = [] + self.bodies = [] + self.source_headers = [] + self.sources = [] + + nodes = GreenthreadSafeIterator(self.node_iter) + + node_timeout = self.app.node_timeout + if self.server_type == 'Object' and not self.newest: + node_timeout = self.app.recoverable_node_timeout + + pile = GreenAsyncPile(self.concurrency) + + for node in nodes: + pile.spawn(self._make_node_request, node, node_timeout, + self.app.logger.thread_locals) + _timeout = self.app.concurrency_timeout \ + if pile.inflight < self.concurrency else None + if pile.waitfirst(_timeout): + break + else: + # ran out of nodes, see if any stragglers will finish + any(pile) + + # this helps weed out any sucess status that were found before a 404 + # and added to the list in the case of x-newest. + if self.sources: + self.sources = [s for s in self.sources + if source_key(s[0]) >= self.latest_404_timestamp] + + if self.sources: + self.sources.sort(key=lambda s: source_key(s[0])) + source, node = self.sources.pop() + for src, _junk in self.sources: + close_swift_conn(src) + self.used_nodes.append(node) + src_headers = dict( + (k.lower(), v) for k, v in + source.getheaders()) + + # Save off the source etag so that, if we lose the connection + # and have to resume from a different node, we can be sure that + # we have the same object (replication) or a fragment archive + # from the same object (EC). Otherwise, if the cluster has two + # versions of the same object, we might end up switching between + # old and new mid-stream and giving garbage to the client. + self.used_source_etag = normalize_etag(src_headers.get( + 'x-object-sysmeta-ec-etag', src_headers.get('etag', ''))) + self.node = node + return source, node + return None, None + + @ObjectControllerRouter.register(EC_POLICY) class ECObjectController(BaseObjectController): def _fragment_GET_request(self, req, node_iter, partition, policy, @@ -2243,11 +2805,11 @@ class ECObjectController(BaseObjectController): backend_headers = self.generate_request_headers( req, additional=req.headers) - getter = ResumingGetter(self.app, req, 'Object', node_iter, - partition, req.swift_entity_path, - backend_headers, - client_chunk_size=policy.fragment_size, - newest=False, header_provider=header_provider) + getter = ECFragGetter(self.app, req, 'Object', node_iter, + partition, req.swift_entity_path, + backend_headers, + client_chunk_size=policy.fragment_size, + newest=False, header_provider=header_provider) return (getter, getter.response_parts_iter(req)) def _convert_range(self, req, policy): @@ -2330,7 +2892,7 @@ class ECObjectController(BaseObjectController): # nodes contain different fragments. Also EC has implemented it's # own specific implementation of concurrent gets to ec_ndata nodes. # So we don't need to worry about plumbing and sending a - # concurrency value to ResumingGetter. + # concurrency value to ECFragGetter. with ContextPool(policy.ec_ndata) as pool: pile = GreenAsyncPile(pool) buckets = ECGetResponseCollection(policy) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index e9cebfe930..fdb3b32283 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -7297,7 +7297,7 @@ class BaseTestECObjectController(BaseTestObjectController): return super(WrappedTimeout, self).__exit__(typ, value, tb) timeouts = {} - with mock.patch('swift.proxy.controllers.base.WatchdogTimeout', + with mock.patch('swift.proxy.controllers.obj.WatchdogTimeout', WrappedTimeout): with mock.patch.object(_test_servers[0], 'client_timeout', new=5): # get object