Extend concurrent_gets to EC GET requests

After the initial requests are started, if the proxy still does not have
enough backend responses to return a client response additional requests
will be spawned to remaining primaries at the frequency configured by
the concurrency_timeout.

A new tunable concurrent_ec_extra_requests allows operators to control
how many requests to backend fragments are started immediately with a
client request to an object stored in an EC storage policy.  By default
the minimum ndata backend requests are started immediately, but
operators may increase concurrent_ec_extra_requests up to nparity which
is similar in effect to a concurrency_timeout of 0.

Change-Id: Ia0a9398107a400815be2e0097b1b8e76336a0253
This commit is contained in:
Clay Gerrard
2020-02-28 09:48:37 -06:00
parent 5b2c846c69
commit 8f60e0a260
7 changed files with 875 additions and 336 deletions

View File

@@ -200,12 +200,14 @@ use = egg:swift#proxy
# the number of seconds configured by timing_expiry. # the number of seconds configured by timing_expiry.
# timing_expiry = 300 # timing_expiry = 300
# #
# By default on a GET/HEAD swift will connect to a storage node one at a time # By default on a GET/HEAD swift will connect to a minimum number storage nodes
# in a single thread. There is smarts in the order they are hit however. If you # in a minimum number of threads - for replicated data just a single request to
# turn on concurrent_gets below, then replica count threads will be used. # a single node one at a time. When enabled concurrent_gets allows the proxy,
# With addition of the concurrency_timeout option this will allow swift to send # to use up to replica count threads when waiting on a response. In
# out GET/HEAD requests to the storage nodes concurrently and answer with the # conjunction with the concurrency_timeout option this will allow swift to send
# first to respond. With an EC policy the parameter only affects HEAD requests. # out GET/HEAD requests to the storage nodes concurrently and answer as soon as
# the minimum number of backend responses are availabe - in replicated contexts
# this will be the first backend replica to respond.
# concurrent_gets = off # concurrent_gets = off
# #
# This parameter controls how long to wait before firing off the next # This parameter controls how long to wait before firing off the next
@@ -215,6 +217,13 @@ use = egg:swift#proxy
# conn_timeout parameter. # conn_timeout parameter.
# concurrency_timeout = 0.5 # concurrency_timeout = 0.5
# #
# By default on a EC GET request swift will connect to a minimum number of
# storage nodes in a minimum number of threads - for erasure coded data, ndata
# requests to primary nodes are started at the same time. When greater than
# zero this option provides additional robustness and may reduce first byte
# latency by starting additional requests - up to as many as nparity.
# concurrent_ec_extra_requests = 0
#
# Set to the number of nodes to contact for a normal request. You can use # Set to the number of nodes to contact for a normal request. You can use
# '* replicas' at the end to have it use the number given times the number of # '* replicas' at the end to have it use the number given times the number of
# replicas for the ring being used for the request. # replicas for the ring being used for the request.

View File

@@ -1488,18 +1488,22 @@ class NodeIter(object):
if node_iter is None: if node_iter is None:
node_iter = itertools.chain( node_iter = itertools.chain(
part_nodes, ring.get_more_nodes(partition)) part_nodes, ring.get_more_nodes(partition))
num_primary_nodes = len(part_nodes) self.num_primary_nodes = len(part_nodes)
self.nodes_left = self.app.request_node_count(num_primary_nodes) self.nodes_left = self.app.request_node_count(self.num_primary_nodes)
self.expected_handoffs = self.nodes_left - num_primary_nodes self.expected_handoffs = self.nodes_left - self.num_primary_nodes
# Use of list() here forcibly yanks the first N nodes (the primary # Use of list() here forcibly yanks the first N nodes (the primary
# nodes) from node_iter, so the rest of its values are handoffs. # nodes) from node_iter, so the rest of its values are handoffs.
self.primary_nodes = self.app.sort_nodes( self.primary_nodes = self.app.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes)), list(itertools.islice(node_iter, self.num_primary_nodes)),
policy=policy) policy=policy)
self.handoff_iter = node_iter self.handoff_iter = node_iter
self._node_provider = None self._node_provider = None
@property
def primaries_left(self):
return len(self.primary_nodes)
def __iter__(self): def __iter__(self):
self._node_iter = self._node_gen() self._node_iter = self._node_gen()
return self return self
@@ -1523,7 +1527,7 @@ class NodeIter(object):
self.app.logger.increment('handoff_count') self.app.logger.increment('handoff_count')
self.app.logger.warning( self.app.logger.warning(
'Handoff requested (%d)' % handoffs) 'Handoff requested (%d)' % handoffs)
if (extra_handoffs == len(self.primary_nodes)): if (extra_handoffs == self.num_primary_nodes):
# all the primaries were skipped, and handoffs didn't help # all the primaries were skipped, and handoffs didn't help
self.app.logger.increment('handoff_all_count') self.app.logger.increment('handoff_all_count')
@@ -1539,7 +1543,8 @@ class NodeIter(object):
self._node_provider = callback self._node_provider = callback
def _node_gen(self): def _node_gen(self):
for node in self.primary_nodes: while self.primary_nodes:
node = self.primary_nodes.pop(0)
if not self.app.error_limited(node): if not self.app.error_limited(node):
yield node yield node
if not self.app.error_limited(node): if not self.app.error_limited(node):

View File

@@ -40,7 +40,7 @@ import sys
from greenlet import GreenletExit from greenlet import GreenletExit
from eventlet import GreenPile, sleep from eventlet import GreenPile, sleep
from eventlet.queue import Queue from eventlet.queue import Queue, Empty
from eventlet.timeout import Timeout from eventlet.timeout import Timeout
from swift import gettext_ as _ from swift import gettext_ as _
@@ -68,7 +68,7 @@ from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
ECDriverError, PolicyError) ECDriverError, PolicyError)
from swift.proxy.controllers.base import Controller, delay_denial, \ from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, update_headers, bytes_to_skip, close_swift_conn, \ cors_validation, update_headers, bytes_to_skip, close_swift_conn, \
ByteCountEnforcer, source_key ByteCountEnforcer
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \
@@ -1400,7 +1400,7 @@ class ECAppIter(object):
# killed by contextpool # killed by contextpool
pass pass
except ChunkReadTimeout: except ChunkReadTimeout:
# unable to resume in GetOrHeadHandler # unable to resume in ECFragGetter
self.logger.exception(_("Timeout fetching fragments for %r"), self.logger.exception(_("Timeout fetching fragments for %r"),
quote(self.path)) quote(self.path))
except: # noqa except: # noqa
@@ -1984,13 +1984,13 @@ class ECGetResponseBucket(object):
A helper class to encapsulate the properties of buckets in which fragment A helper class to encapsulate the properties of buckets in which fragment
getters and alternate nodes are collected. getters and alternate nodes are collected.
""" """
def __init__(self, policy, timestamp_str): def __init__(self, policy, timestamp):
""" """
:param policy: an instance of ECStoragePolicy :param policy: an instance of ECStoragePolicy
:param timestamp_str: a string representation of a timestamp :param timestamp: a Timestamp, or None for a bucket of error reponses
""" """
self.policy = policy self.policy = policy
self.timestamp_str = timestamp_str self.timestamp = timestamp
self.gets = collections.defaultdict(list) self.gets = collections.defaultdict(list)
self.alt_nodes = collections.defaultdict(list) self.alt_nodes = collections.defaultdict(list)
self._durable = False self._durable = False
@@ -2004,10 +2004,20 @@ class ECGetResponseBucket(object):
return self._durable return self._durable
def add_response(self, getter, parts_iter): def add_response(self, getter, parts_iter):
"""
Add another response to this bucket. Response buckets can be for
fragments with the same timestamp, or for errors with the same status.
"""
headers = getter.last_headers
timestamp_str = headers.get('X-Backend-Timestamp',
headers.get('X-Timestamp'))
if timestamp_str:
# 404s will keep the most recent timestamp
self.timestamp = max(Timestamp(timestamp_str), self.timestamp)
if not self.gets: if not self.gets:
self.status = getter.last_status
# stash first set of backend headers, which will be used to # stash first set of backend headers, which will be used to
# populate a client response # populate a client response
self.status = getter.last_status
# TODO: each bucket is for a single *data* timestamp, but sources # TODO: each bucket is for a single *data* timestamp, but sources
# in the same bucket may have different *metadata* timestamps if # in the same bucket may have different *metadata* timestamps if
# some backends have more recent .meta files than others. Currently # some backends have more recent .meta files than others. Currently
@@ -2017,18 +2027,17 @@ class ECGetResponseBucket(object):
# recent metadata. We could alternatively choose to the *newest* # recent metadata. We could alternatively choose to the *newest*
# metadata headers for self.headers by selecting the source with # metadata headers for self.headers by selecting the source with
# the latest X-Timestamp. # the latest X-Timestamp.
self.headers = getter.last_headers self.headers = headers
elif (self.timestamp_str is not None and # ie, not bad_bucket elif headers.get('X-Object-Sysmeta-Ec-Etag') != \
getter.last_headers.get('X-Object-Sysmeta-Ec-Etag') != self.headers.get('X-Object-Sysmeta-Ec-Etag'):
self.headers.get('X-Object-Sysmeta-Ec-Etag')):
# Fragments at the same timestamp with different etags are never # Fragments at the same timestamp with different etags are never
# expected. If somehow it happens then ignore those fragments # expected and error buckets shouldn't have this header. If somehow
# to avoid mixing fragments that will not reconstruct otherwise # this happens then ignore those responses to avoid mixing
# an exception from pyeclib is almost certain. This strategy leaves # fragments that will not reconstruct otherwise an exception from
# a possibility that a set of consistent frags will be gathered. # pyeclib is almost certain.
raise ValueError("ETag mismatch") raise ValueError("ETag mismatch")
frag_index = getter.last_headers.get('X-Object-Sysmeta-Ec-Frag-Index') frag_index = headers.get('X-Object-Sysmeta-Ec-Frag-Index')
frag_index = int(frag_index) if frag_index is not None else None frag_index = int(frag_index) if frag_index is not None else None
self.gets[frag_index].append((getter, parts_iter)) self.gets[frag_index].append((getter, parts_iter))
@@ -2056,8 +2065,19 @@ class ECGetResponseBucket(object):
@property @property
def shortfall(self): def shortfall(self):
result = self.policy.ec_ndata - len(self.get_responses()) """
return max(result, 0) The number of additional responses needed to complete this bucket;
typically (ndata - resp_count).
If the bucket has no durable responses, shortfall is extended out to
replica count to ensure the proxy makes additional primary requests.
"""
resp_count = len(self.get_responses())
if self.durable or self.status == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE:
return max(self.policy.ec_ndata - resp_count, 0)
alt_count = min(self.policy.object_ring.replica_count - resp_count,
self.policy.ec_nparity)
return max([1, self.policy.ec_ndata - resp_count, alt_count])
@property @property
def shortfall_with_alts(self): def shortfall_with_alts(self):
@@ -2070,7 +2090,7 @@ class ECGetResponseBucket(object):
def __str__(self): def __str__(self):
# return a string summarising bucket state, useful for debugging. # return a string summarising bucket state, useful for debugging.
return '<%s, %s, %s, %s(%s), %s>' \ return '<%s, %s, %s, %s(%s), %s>' \
% (self.timestamp_str, self.status, self._durable, % (self.timestamp.internal, self.status, self._durable,
self.shortfall, self.shortfall_with_alts, len(self.gets)) self.shortfall, self.shortfall_with_alts, len(self.gets))
@@ -2092,15 +2112,24 @@ class ECGetResponseCollection(object):
""" """
self.policy = policy self.policy = policy
self.buckets = {} self.buckets = {}
self.bad_buckets = {None: ECGetResponseBucket(self.policy, None)}
self.node_iter_count = 0 self.node_iter_count = 0
def _get_bucket(self, timestamp_str): def _get_bucket(self, timestamp):
""" """
:param timestamp_str: a string representation of a timestamp :param timestamp: a Timestamp
:return: ECGetResponseBucket for given timestamp :return: ECGetResponseBucket for given timestamp
""" """
return self.buckets.setdefault( return self.buckets.setdefault(
timestamp_str, ECGetResponseBucket(self.policy, timestamp_str)) timestamp, ECGetResponseBucket(self.policy, timestamp))
def _get_bad_bucket(self, status):
"""
:param status: a representation of status
:return: ECGetResponseBucket for given status
"""
return self.bad_buckets.setdefault(
status, ECGetResponseBucket(self.policy, None))
def add_response(self, get, parts_iter): def add_response(self, get, parts_iter):
""" """
@@ -2112,13 +2141,31 @@ class ECGetResponseCollection(object):
:raises ValueError: if the response etag or status code values do not :raises ValueError: if the response etag or status code values do not
match any values previously received for the same timestamp match any values previously received for the same timestamp
""" """
if is_success(get.last_status):
self.add_good_response(get, parts_iter)
else:
self.add_bad_resp(get, parts_iter)
def add_bad_resp(self, get, parts_iter):
bad_bucket = self._get_bad_bucket(get.last_status)
bad_bucket.add_response(get, parts_iter)
def add_good_response(self, get, parts_iter):
headers = get.last_headers headers = get.last_headers
# Add the response to the appropriate bucket keyed by data file # Add the response to the appropriate bucket keyed by data file
# timestamp. Fall back to using X-Backend-Timestamp as key for object # timestamp. Fall back to using X-Backend-Timestamp as key for object
# servers that have not been upgraded. # servers that have not been upgraded.
t_data_file = headers.get('X-Backend-Data-Timestamp') t_data_file = headers.get('X-Backend-Data-Timestamp')
t_obj = headers.get('X-Backend-Timestamp', headers.get('X-Timestamp')) t_obj = headers.get('X-Backend-Timestamp', headers.get('X-Timestamp'))
self._get_bucket(t_data_file or t_obj).add_response(get, parts_iter) if t_data_file:
timestamp = Timestamp(t_data_file)
elif t_obj:
timestamp = Timestamp(t_obj)
else:
# Don't think this should ever come up in practice,
# but tests cover it
timestamp = None
self._get_bucket(timestamp).add_response(get, parts_iter)
# The node may also have alternate fragments indexes (possibly at # The node may also have alternate fragments indexes (possibly at
# different timestamps). For each list of alternate fragments indexes, # different timestamps). For each list of alternate fragments indexes,
@@ -2126,6 +2173,7 @@ class ECGetResponseCollection(object):
# list to that bucket's alternate nodes. # list to that bucket's alternate nodes.
frag_sets = safe_json_loads(headers.get('X-Backend-Fragments')) or {} frag_sets = safe_json_loads(headers.get('X-Backend-Fragments')) or {}
for t_frag, frag_set in frag_sets.items(): for t_frag, frag_set in frag_sets.items():
t_frag = Timestamp(t_frag)
self._get_bucket(t_frag).add_alternate_nodes(get.node, frag_set) self._get_bucket(t_frag).add_alternate_nodes(get.node, frag_set)
# If the response includes a durable timestamp then mark that bucket as # If the response includes a durable timestamp then mark that bucket as
# durable. Note that this may be a different bucket than the one this # durable. Note that this may be a different bucket than the one this
@@ -2137,7 +2185,7 @@ class ECGetResponseCollection(object):
# obj server not upgraded so assume this response's frag is durable # obj server not upgraded so assume this response's frag is durable
t_durable = t_obj t_durable = t_obj
if t_durable: if t_durable:
self._get_bucket(t_durable).set_durable() self._get_bucket(Timestamp(t_durable)).set_durable()
def _sort_buckets(self): def _sort_buckets(self):
def key_fn(bucket): def key_fn(bucket):
@@ -2150,35 +2198,65 @@ class ECGetResponseCollection(object):
return (bucket.durable, return (bucket.durable,
bucket.shortfall <= 0, bucket.shortfall <= 0,
-1 * bucket.shortfall_with_alts, -1 * bucket.shortfall_with_alts,
bucket.timestamp_str) bucket.timestamp)
return sorted(self.buckets.values(), key=key_fn, reverse=True) return sorted(self.buckets.values(), key=key_fn, reverse=True)
@property @property
def best_bucket(self): def best_bucket(self):
""" """
Return the best bucket in the collection. Return the "best" bucket in the collection.
The "best" bucket is the newest timestamp with sufficient getters, or The "best" bucket is the newest timestamp with sufficient getters, or
the closest to having sufficient getters, unless it is bettered by a the closest to having sufficient getters, unless it is bettered by a
bucket with potential alternate nodes. bucket with potential alternate nodes.
If there are no good buckets we return the "least_bad" bucket.
:return: An instance of :class:`~ECGetResponseBucket` or None if there :return: An instance of :class:`~ECGetResponseBucket` or None if there
are no buckets in the collection. are no buckets in the collection.
""" """
sorted_buckets = self._sort_buckets() sorted_buckets = self._sort_buckets()
if sorted_buckets: for bucket in sorted_buckets:
return sorted_buckets[0] # tombstones will set bad_bucket.timestamp
return None not_found_bucket = self.bad_buckets.get(404)
if not_found_bucket and not_found_bucket.timestamp and \
bucket.timestamp < not_found_bucket.timestamp:
# "good bucket" is trumped by newer tombstone
continue
return bucket
return self.least_bad_bucket
@property
def least_bad_bucket(self):
"""
Return the bad_bucket with the smallest shortfall
"""
# we want "enough" 416s to prevent "extra" requests - but we keep
# digging on 404s
short, status = min((bucket.shortfall, status)
for status, bucket in self.bad_buckets.items()
if status != 404)
return self.bad_buckets[status]
@property
def shortfall(self):
best_bucket = self.best_bucket
shortfall = best_bucket.shortfall
return min(shortfall, self.least_bad_bucket.shortfall)
@property
def durable(self):
return self.best_bucket.durable
def _get_frag_prefs(self): def _get_frag_prefs(self):
# Construct the current frag_prefs list, with best_bucket prefs first. # Construct the current frag_prefs list, with best_bucket prefs first.
frag_prefs = [] frag_prefs = []
for bucket in self._sort_buckets(): for bucket in self._sort_buckets():
if bucket.timestamp_str: if bucket.timestamp:
exclusions = [fi for fi in bucket.gets if fi is not None] exclusions = [fi for fi in bucket.gets if fi is not None]
prefs = {'timestamp': bucket.timestamp_str, prefs = {'timestamp': bucket.timestamp.internal,
'exclude': exclusions} 'exclude': exclusions}
frag_prefs.append(prefs) frag_prefs.append(prefs)
@@ -2237,44 +2315,34 @@ class ECGetResponseCollection(object):
return nodes.pop(0).copy() return nodes.pop(0).copy()
def is_good_source(status):
"""
Indicates whether or not the request made to the backend found
what it was looking for.
:param status: the response from the backend
:returns: True if found, False if not
"""
if status == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE:
return True
return is_success(status) or is_redirection(status)
class ECFragGetter(object): class ECFragGetter(object):
def __init__(self, app, req, server_type, node_iter, partition, path,
backend_headers, concurrency=1, client_chunk_size=None, def __init__(self, app, req, node_iter, partition, policy, path,
newest=None, header_provider=None): backend_headers, header_provider=None):
self.app = app self.app = app
self.req = req
self.node_iter = node_iter self.node_iter = node_iter
self.server_type = server_type
self.partition = partition self.partition = partition
self.path = path self.path = path
self.backend_headers = backend_headers self.backend_headers = backend_headers
self.client_chunk_size = client_chunk_size self.header_provider = header_provider
self.req_query_string = req.query_string
self.client_chunk_size = policy.fragment_size
self.skip_bytes = 0 self.skip_bytes = 0
self.bytes_used_from_backend = 0 self.bytes_used_from_backend = 0
self.used_nodes = []
self.used_source_etag = ''
self.concurrency = concurrency
self.node = None
self.header_provider = header_provider
self.latest_404_timestamp = Timestamp(0)
# stuff from request
self.req_method = req.method
self.req_path = req.path
self.req_query_string = req.query_string
if newest is None:
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
else:
self.newest = newest
# populated when finding source
self.statuses = []
self.reasons = []
self.bodies = []
self.source_headers = []
self.sources = []
# populated from response headers
self.start_byte = self.end_byte = self.length = None
def fast_forward(self, num_bytes): def fast_forward(self, num_bytes):
""" """
@@ -2382,20 +2450,11 @@ class ECFragGetter(object):
e if e is not None else '') e if e is not None else '')
for s, e in new_ranges))) for s, e in new_ranges)))
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
what it was looking for.
:param src: the response from the backend
:returns: True if found, False if not
"""
if self.server_type == 'Object' and src.status == 416:
return True
return is_success(src.status) or is_redirection(src.status)
def response_parts_iter(self, req): def response_parts_iter(self, req):
source, node = self._get_source_and_node() try:
source, node = next(self.source_and_node_iter)
except StopIteration:
return
it = None it = None
if source: if source:
it = self._get_response_parts_iter(req, node, source) it = self._get_response_parts_iter(req, node, source)
@@ -2408,9 +2467,7 @@ class ECFragGetter(object):
try: try:
client_chunk_size = self.client_chunk_size client_chunk_size = self.client_chunk_size
node_timeout = self.app.node_timeout node_timeout = self.app.recoverable_node_timeout
if self.server_type == 'Object':
node_timeout = self.app.recoverable_node_timeout
# This is safe; it sets up a generator but does not call next() # This is safe; it sets up a generator but does not call next()
# on it, so no IO is performed. # on it, so no IO is performed.
@@ -2437,7 +2494,7 @@ class ECFragGetter(object):
parts_iter[0]) parts_iter[0])
return (start_byte, end_byte, length, headers, part) return (start_byte, end_byte, length, headers, part)
except ChunkReadTimeout: except ChunkReadTimeout:
new_source, new_node = self._get_source_and_node() new_source, new_node = self._dig_for_source_and_node()
if new_source: if new_source:
self.app.error_occurred( self.app.error_occurred(
node[0], _('Trying to read object during ' node[0], _('Trying to read object during '
@@ -2472,8 +2529,6 @@ class ECFragGetter(object):
nbytes -= len(chunk) nbytes -= len(chunk)
except (ChunkReadTimeout, ShortReadError): except (ChunkReadTimeout, ShortReadError):
exc_type, exc_value, exc_traceback = sys.exc_info() exc_type, exc_value, exc_traceback = sys.exc_info()
if self.newest or self.server_type != 'Object':
raise
try: try:
self.fast_forward(self.bytes_used_from_backend) self.fast_forward(self.bytes_used_from_backend)
except (HTTPException, ValueError): except (HTTPException, ValueError):
@@ -2481,7 +2536,7 @@ class ECFragGetter(object):
except RangeAlreadyComplete: except RangeAlreadyComplete:
break break
buf = b'' buf = b''
new_source, new_node = self._get_source_and_node() new_source, new_node = self._dig_for_source_and_node()
if new_source: if new_source:
self.app.error_occurred( self.app.error_occurred(
node[0], _('Trying to read object during ' node[0], _('Trying to read object during '
@@ -2627,33 +2682,26 @@ class ECFragGetter(object):
@property @property
def last_status(self): def last_status(self):
if self.statuses: return self.status or HTTP_INTERNAL_SERVER_ERROR
return self.statuses[-1]
else:
return None
@property @property
def last_headers(self): def last_headers(self):
if self.source_headers: if self.source_headers:
return HeaderKeyDict(self.source_headers[-1]) return HeaderKeyDict(self.source_headers)
else: else:
return None return HeaderKeyDict()
def _make_node_request(self, node, node_timeout, logger_thread_locals): def _make_node_request(self, node, node_timeout, logger_thread_locals):
self.app.logger.thread_locals = logger_thread_locals self.app.logger.thread_locals = logger_thread_locals
if node in self.used_nodes:
return False
req_headers = dict(self.backend_headers) req_headers = dict(self.backend_headers)
# a request may be specialised with specific backend headers
if self.header_provider:
req_headers.update(self.header_provider())
ip, port = get_ip_port(node, req_headers) ip, port = get_ip_port(node, req_headers)
req_headers.update(self.header_provider())
start_node_timing = time.time() start_node_timing = time.time()
try: try:
with ConnectionTimeout(self.app.conn_timeout): with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect( conn = http_connect(
ip, port, node['device'], ip, port, node['device'],
self.partition, self.req_method, self.path, self.partition, 'GET', self.path,
headers=req_headers, headers=req_headers,
query_string=self.req_query_string) query_string=self.req_query_string)
self.app.set_node_timing(node, time.time() - start_node_timing) self.app.set_node_timing(node, time.time() - start_node_timing)
@@ -2664,134 +2712,69 @@ class ECFragGetter(object):
possible_source.swift_conn = conn possible_source.swift_conn = conn
except (Exception, Timeout): except (Exception, Timeout):
self.app.exception_occurred( self.app.exception_occurred(
node, self.server_type, node, 'Object',
_('Trying to %(method)s %(path)s') % _('Trying to %(method)s %(path)s') %
{'method': self.req_method, 'path': self.req_path}) {'method': self.req.method, 'path': self.req.path})
return False return None
src_headers = dict( src_headers = dict(
(k.lower(), v) for k, v in (k.lower(), v) for k, v in
possible_source.getheaders()) possible_source.getheaders())
if self.is_good_source(possible_source):
# 404 if we know we don't have a synced copy
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
close_swift_conn(possible_source)
else:
if self.used_source_etag and \
self.used_source_etag != normalize_etag(src_headers.get(
'x-object-sysmeta-ec-etag',
src_headers.get('etag', ''))):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
return False
# a possible source should only be added as a valid source if 'handoff_index' in node and \
# if its timestamp is newer than previously found tombstones (is_server_error(possible_source.status) or
ps_timestamp = Timestamp( possible_source.status == HTTP_NOT_FOUND) and \
src_headers.get('x-backend-data-timestamp') or not Timestamp(src_headers.get('x-backend-timestamp', 0)):
src_headers.get('x-backend-timestamp') or # throw out 5XX and 404s from handoff nodes unless the data is
src_headers.get('x-put-timestamp') or # really on disk and had been DELETEd
src_headers.get('x-timestamp') or 0) return None
if ps_timestamp >= self.latest_404_timestamp:
self.statuses.append(possible_source.status) self.status = possible_source.status
self.reasons.append(possible_source.reason) self.reason = possible_source.reason
self.bodies.append(None) self.source_headers = possible_source.getheaders()
self.source_headers.append(possible_source.getheaders()) if is_good_source(possible_source.status):
self.sources.append((possible_source, node)) self.body = None
if not self.newest: # one good source is enough return possible_source
return True
else: else:
if 'handoff_index' in node and \ self.body = possible_source.read()
(is_server_error(possible_source.status) or
possible_source.status == HTTP_NOT_FOUND) and \
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
# throw out 5XX and 404s from handoff nodes unless the data is
# really on disk and had been DELETEd
return False
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(possible_source.read())
self.source_headers.append(possible_source.getheaders())
# if 404, record the timestamp. If a good source shows up, its
# timestamp will be compared to the latest 404.
# For now checking only on objects, but future work could include
# the same check for account and containers. See lp 1560574.
if self.server_type == 'Object' and \
possible_source.status == HTTP_NOT_FOUND:
hdrs = HeaderKeyDict(possible_source.getheaders())
ts = Timestamp(hdrs.get('X-Backend-Timestamp', 0))
if ts > self.latest_404_timestamp:
self.latest_404_timestamp = ts
if possible_source.status == HTTP_INSUFFICIENT_STORAGE: if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.app.error_limit(node, _('ERROR Insufficient Storage')) self.app.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status): elif is_server_error(possible_source.status):
self.app.error_occurred( self.app.error_occurred(
node, _('ERROR %(status)d %(body)s ' node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') % 'From Object Server') %
{'status': possible_source.status, {'status': possible_source.status,
'body': self.bodies[-1][:1024], 'body': self.body[:1024]})
'type': self.server_type}) return None
return False
def _get_source_and_node(self): @property
self.statuses = [] def source_and_node_iter(self):
self.reasons = [] if not hasattr(self, '_source_and_node_iter'):
self.bodies = [] self._source_and_node_iter = self._source_and_node_gen()
self.source_headers = [] return self._source_and_node_iter
self.sources = []
nodes = GreenthreadSafeIterator(self.node_iter) def _source_and_node_gen(self):
self.status = self.reason = self.body = self.source_headers = None
for node in self.node_iter:
source = self._make_node_request(
node, self.app.recoverable_node_timeout,
self.app.logger.thread_locals)
node_timeout = self.app.node_timeout if source:
if self.server_type == 'Object' and not self.newest: self.node = node
node_timeout = self.app.recoverable_node_timeout yield source, node
else:
yield None, None
self.status = self.reason = self.body = self.source_headers = None
pile = GreenAsyncPile(self.concurrency) def _dig_for_source_and_node(self):
# capture last used etag before continuation
for node in nodes: used_etag = self.last_headers.get('X-Object-Sysmeta-EC-ETag')
pile.spawn(self._make_node_request, node, node_timeout, for source, node in self.source_and_node_iter:
self.app.logger.thread_locals) if source and is_good_source(source.status) and \
_timeout = self.app.concurrency_timeout \ source.getheader('X-Object-Sysmeta-EC-ETag') == used_etag:
if pile.inflight < self.concurrency else None return source, node
if pile.waitfirst(_timeout):
break
else:
# ran out of nodes, see if any stragglers will finish
any(pile)
# this helps weed out any sucess status that were found before a 404
# and added to the list in the case of x-newest.
if self.sources:
self.sources = [s for s in self.sources
if source_key(s[0]) >= self.latest_404_timestamp]
if self.sources:
self.sources.sort(key=lambda s: source_key(s[0]))
source, node = self.sources.pop()
for src, _junk in self.sources:
close_swift_conn(src)
self.used_nodes.append(node)
src_headers = dict(
(k.lower(), v) for k, v in
source.getheaders())
# Save off the source etag so that, if we lose the connection
# and have to resume from a different node, we can be sure that
# we have the same object (replication) or a fragment archive
# from the same object (EC). Otherwise, if the cluster has two
# versions of the same object, we might end up switching between
# old and new mid-stream and giving garbage to the client.
self.used_source_etag = normalize_etag(src_headers.get(
'x-object-sysmeta-ec-etag', src_headers.get('etag', '')))
self.node = node
return source, node
return None, None return None, None
@@ -2805,11 +2788,9 @@ class ECObjectController(BaseObjectController):
backend_headers = self.generate_request_headers( backend_headers = self.generate_request_headers(
req, additional=req.headers) req, additional=req.headers)
getter = ECFragGetter(self.app, req, 'Object', node_iter, getter = ECFragGetter(self.app, req, node_iter, partition,
partition, req.swift_entity_path, policy, req.swift_entity_path, backend_headers,
backend_headers, header_provider=header_provider)
client_chunk_size=policy.fragment_size,
newest=False, header_provider=header_provider)
return (getter, getter.response_parts_iter(req)) return (getter, getter.response_parts_iter(req))
def _convert_range(self, req, policy): def _convert_range(self, req, policy):
@@ -2864,6 +2845,25 @@ class ECObjectController(BaseObjectController):
for s, e in new_ranges) for s, e in new_ranges)
return range_specs return range_specs
def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy,
buckets, feeder_q):
while True:
try:
feeder_q.get(timeout=self.app.concurrency_timeout)
except Empty:
if safe_iter.unsafe_iter.primaries_left:
# this will run async, if it ends up taking the last
# primary we won't find out until the next pass
pile.spawn(self._fragment_GET_request,
req, safe_iter, partition,
policy, buckets.get_extra_headers)
else:
# ran out of primaries
break
else:
# got a stop
break
def _get_or_head_response(self, req, node_iter, partition, policy): def _get_or_head_response(self, req, node_iter, partition, policy):
update_etag_is_at_header(req, "X-Object-Sysmeta-Ec-Etag") update_etag_is_at_header(req, "X-Object-Sysmeta-Ec-Etag")
@@ -2887,27 +2887,24 @@ class ECObjectController(BaseObjectController):
safe_iter = GreenthreadSafeIterator(node_iter) safe_iter = GreenthreadSafeIterator(node_iter)
# Sending the request concurrently to all nodes, and responding ec_request_count = policy.ec_ndata + \
# with the first response isn't something useful for EC as all self.app.concurrent_ec_extra_requests
# nodes contain different fragments. Also EC has implemented it's with ContextPool(ec_request_count) as pool:
# own specific implementation of concurrent gets to ec_ndata nodes.
# So we don't need to worry about plumbing and sending a
# concurrency value to ECFragGetter.
with ContextPool(policy.ec_ndata) as pool:
pile = GreenAsyncPile(pool) pile = GreenAsyncPile(pool)
buckets = ECGetResponseCollection(policy) buckets = ECGetResponseCollection(policy)
node_iter.set_node_provider(buckets.provide_alternate_node) node_iter.set_node_provider(buckets.provide_alternate_node)
# include what may well be an empty X-Backend-Fragment-Preferences
# header from the buckets.get_extra_headers to let the object for node_count in range(ec_request_count):
# server know that it is ok to return non-durable fragments
for _junk in range(policy.ec_ndata):
pile.spawn(self._fragment_GET_request, pile.spawn(self._fragment_GET_request,
req, safe_iter, partition, req, safe_iter, partition,
policy, buckets.get_extra_headers) policy, buckets.get_extra_headers)
bad_bucket = ECGetResponseBucket(policy, None) feeder_q = None
bad_bucket.set_durable() if self.app.concurrent_gets:
best_bucket = None feeder_q = Queue()
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
partition, policy, buckets, feeder_q)
extra_requests = 0 extra_requests = 0
# max_extra_requests is an arbitrary hard limit for spawning extra # max_extra_requests is an arbitrary hard limit for spawning extra
# getters in case some unforeseen scenario, or a misbehaving object # getters in case some unforeseen scenario, or a misbehaving object
@@ -2917,52 +2914,33 @@ class ECObjectController(BaseObjectController):
# be limit at most 2 * replicas. # be limit at most 2 * replicas.
max_extra_requests = ( max_extra_requests = (
(policy.object_ring.replica_count * 2) - policy.ec_ndata) (policy.object_ring.replica_count * 2) - policy.ec_ndata)
for get, parts_iter in pile: for get, parts_iter in pile:
if get.last_status is None:
# We may have spawned getters that find the node iterator
# has been exhausted. Ignore them.
# TODO: turns out that node_iter.nodes_left can bottom
# out at >0 when number of devs in ring is < 2* replicas,
# which definitely happens in tests and results in status
# of None. We should fix that but keep this guard because
# there is also a race between testing nodes_left/spawning
# a getter and an existing getter calling next(node_iter).
continue
try: try:
if is_success(get.last_status): buckets.add_response(get, parts_iter)
# 2xx responses are managed by a response collection
buckets.add_response(get, parts_iter)
else:
# all other responses are lumped into a single bucket
bad_bucket.add_response(get, parts_iter)
except ValueError as err: except ValueError as err:
self.app.logger.error( self.app.logger.error(
_("Problem with fragment response: %s"), err) _("Problem with fragment response: %s"), err)
shortfall = bad_bucket.shortfall
best_bucket = buckets.best_bucket best_bucket = buckets.best_bucket
if best_bucket: if best_bucket.durable and best_bucket.shortfall <= 0:
shortfall = best_bucket.shortfall # good enough!
if not best_bucket.durable and shortfall <= 0: break
# be willing to go a *little* deeper, slowly requests_available = extra_requests < max_extra_requests and (
shortfall = 1 node_iter.nodes_left > 0 or buckets.has_alternate_node())
shortfall = min(shortfall, bad_bucket.shortfall) bad_resp = not is_good_source(get.last_status)
if (extra_requests < max_extra_requests and if requests_available and (
shortfall > pile._pending and buckets.shortfall > pile._pending or bad_resp):
(node_iter.nodes_left > 0 or
buckets.has_alternate_node())):
# we need more matching responses to reach ec_ndata
# than we have pending gets, as long as we still have
# nodes in node_iter we can spawn another
extra_requests += 1 extra_requests += 1
pile.spawn(self._fragment_GET_request, req, pile.spawn(self._fragment_GET_request,
safe_iter, partition, policy, req, safe_iter, partition,
buckets.get_extra_headers) policy, buckets.get_extra_headers)
if feeder_q:
feeder_q.put('stop')
# Put this back, since we *may* need it for kickoff()/_fix_response() # Put this back, since we *may* need it for kickoff()/_fix_response()
# (but note that _fix_ranges() may also pop it back off before then) # (but note that _fix_ranges() may also pop it back off before then)
req.range = orig_range req.range = orig_range
if best_bucket and best_bucket.shortfall <= 0 and best_bucket.durable: best_bucket = buckets.best_bucket
if best_bucket.shortfall <= 0 and best_bucket.durable:
# headers can come from any of the getters # headers can come from any of the getters
resp_headers = best_bucket.headers resp_headers = best_bucket.headers
resp_headers.pop('Content-Range', None) resp_headers.pop('Content-Range', None)
@@ -2975,8 +2953,7 @@ class ECObjectController(BaseObjectController):
app_iter = ECAppIter( app_iter = ECAppIter(
req.swift_entity_path, req.swift_entity_path,
policy, policy,
[parts_iter for [p_iter for _getter, p_iter in best_bucket.get_responses()],
_getter, parts_iter in best_bucket.get_responses()],
range_specs, fa_length, obj_length, range_specs, fa_length, obj_length,
self.app.logger) self.app.logger)
resp = Response( resp = Response(
@@ -3002,25 +2979,28 @@ class ECObjectController(BaseObjectController):
reasons = [] reasons = []
bodies = [] bodies = []
headers = [] headers = []
for getter, _parts_iter in bad_bucket.get_responses(): for status, bad_bucket in buckets.bad_buckets.items():
if best_bucket and best_bucket.durable: for getter, _parts_iter in bad_bucket.get_responses():
bad_resp_headers = HeaderKeyDict(getter.last_headers) if best_bucket.durable:
t_data_file = bad_resp_headers.get( bad_resp_headers = getter.last_headers
'X-Backend-Data-Timestamp') t_data_file = bad_resp_headers.get(
t_obj = bad_resp_headers.get( 'X-Backend-Data-Timestamp')
'X-Backend-Timestamp', t_obj = bad_resp_headers.get(
bad_resp_headers.get('X-Timestamp')) 'X-Backend-Timestamp',
bad_ts = Timestamp(t_data_file or t_obj or '0') bad_resp_headers.get('X-Timestamp'))
if bad_ts <= Timestamp(best_bucket.timestamp_str): bad_ts = Timestamp(t_data_file or t_obj or '0')
# We have reason to believe there's still good data if bad_ts <= best_bucket.timestamp:
# out there, it's just currently unavailable # We have reason to believe there's still good data
continue # out there, it's just currently unavailable
statuses.extend(getter.statuses) continue
reasons.extend(getter.reasons) if getter.status:
bodies.extend(getter.bodies) statuses.append(getter.status)
headers.extend(getter.source_headers) reasons.append(getter.reason)
bodies.append(getter.body)
headers.append(getter.source_headers)
if not statuses and best_bucket and not best_bucket.durable: if not statuses and is_success(best_bucket.status) and \
not best_bucket.durable:
# pretend that non-durable bucket was 404s # pretend that non-durable bucket was 404s
statuses.append(404) statuses.append(404)
reasons.append('404 Not Found') reasons.append('404 Not Found')

View File

@@ -263,6 +263,8 @@ class Application(object):
self.concurrent_gets = config_true_value(conf.get('concurrent_gets')) self.concurrent_gets = config_true_value(conf.get('concurrent_gets'))
self.concurrency_timeout = float(conf.get('concurrency_timeout', self.concurrency_timeout = float(conf.get('concurrency_timeout',
self.conn_timeout)) self.conn_timeout))
self.concurrent_ec_extra_requests = int(
conf.get('concurrent_ec_extra_requests', 0))
value = conf.get('request_node_count', '2 * replicas').lower().split() value = conf.get('request_node_count', '2 * replicas').lower().split()
if len(value) == 1: if len(value) == 1:
rnc_value = int(value[0]) rnc_value = int(value[0])

View File

@@ -21,29 +21,27 @@ import mock
import six import six
from swift.proxy import server as proxy_server
from swift.proxy.controllers.base import headers_to_container_info, \ from swift.proxy.controllers.base import headers_to_container_info, \
headers_to_account_info, headers_to_object_info, get_container_info, \ headers_to_account_info, headers_to_object_info, get_container_info, \
get_cache_key, get_account_info, get_info, get_object_info, \ get_cache_key, get_account_info, get_info, get_object_info, \
Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \ Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \
set_info_cache set_info_cache, NodeIter
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \ from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
bytes_to_wsgi bytes_to_wsgi
from swift.common import exceptions from swift.common import exceptions
from swift.common.utils import split_path, ShardRange, Timestamp from swift.common.utils import split_path, ShardRange, Timestamp, \
GreenthreadSafeIterator, GreenAsyncPile
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.http import is_success from swift.common.http import is_success
from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection
from test.unit import ( from test.unit import (
fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, FakeLogger, fake_http_connect, FakeRing, FakeMemcache, PatchPolicies,
make_timestamp_iter, make_timestamp_iter, mocked_http_conn, patch_policies, debug_logger)
mocked_http_conn)
from swift.proxy import server as proxy_server
from swift.common.request_helpers import ( from swift.common.request_helpers import (
get_sys_meta_prefix, get_object_transient_sysmeta get_sys_meta_prefix, get_object_transient_sysmeta
) )
from test.unit import patch_policies
class FakeResponse(object): class FakeResponse(object):
@@ -179,13 +177,22 @@ class FakeCache(FakeMemcache):
return self.stub or self.store.get(key) return self.stub or self.store.get(key)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class BaseTest(unittest.TestCase):
class TestFuncs(unittest.TestCase):
def setUp(self): def setUp(self):
self.app = proxy_server.Application(None, self.logger = debug_logger()
account_ring=FakeRing(), self.cache = FakeCache()
container_ring=FakeRing(), self.conf = {}
logger=FakeLogger()) self.account_ring = FakeRing()
self.container_ring = FakeRing()
self.app = proxy_server.Application(self.conf,
logger=self.logger,
account_ring=self.account_ring,
container_ring=self.container_ring)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestFuncs(BaseTest):
def test_get_info_zero_recheck(self): def test_get_info_zero_recheck(self):
mock_cache = mock.Mock() mock_cache = mock.Mock()
@@ -1325,3 +1332,76 @@ class TestFuncs(unittest.TestCase):
self.assertIn('Failed to get container listing', warning_lines[0]) self.assertIn('Failed to get container listing', warning_lines[0])
self.assertIn('/a/c', warning_lines[0]) self.assertIn('/a/c', warning_lines[0])
self.assertFalse(warning_lines[1:]) self.assertFalse(warning_lines[1:])
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestNodeIter(BaseTest):
def test_iter_default_fake_ring(self):
for ring in (self.account_ring, self.container_ring):
self.assertEqual(ring.replica_count, 3.0)
node_iter = NodeIter(self.app, ring, 0)
self.assertEqual(6, node_iter.nodes_left)
self.assertEqual(3, node_iter.primaries_left)
count = 0
for node in node_iter:
count += 1
self.assertEqual(count, 3)
self.assertEqual(0, node_iter.primaries_left)
# default fake_ring has NO handoffs, so nodes_left is kind of a lie
self.assertEqual(3, node_iter.nodes_left)
def test_iter_with_handoffs(self):
ring = FakeRing(replicas=3, max_more_nodes=20) # handoffs available
policy = StoragePolicy(0, 'zero', object_ring=ring)
node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy)
self.assertEqual(6, node_iter.nodes_left)
self.assertEqual(3, node_iter.primaries_left)
primary_indexes = set()
handoff_indexes = []
count = 0
for node in node_iter:
if 'index' in node:
primary_indexes.add(node['index'])
else:
handoff_indexes.append(node['handoff_index'])
count += 1
self.assertEqual(count, 6)
self.assertEqual(0, node_iter.primaries_left)
self.assertEqual(0, node_iter.nodes_left)
self.assertEqual({0, 1, 2}, primary_indexes)
self.assertEqual([0, 1, 2], handoff_indexes)
def test_multi_iteration(self):
ring = FakeRing(replicas=8, max_more_nodes=20)
policy = StoragePolicy(0, 'ec', object_ring=ring)
# sanity
node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy)
self.assertEqual(16, len([n for n in node_iter]))
node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy)
self.assertEqual(16, node_iter.nodes_left)
self.assertEqual(8, node_iter.primaries_left)
pile = GreenAsyncPile(5)
def eat_node(node_iter):
return next(node_iter)
safe_iter = GreenthreadSafeIterator(node_iter)
for i in range(5):
pile.spawn(eat_node, safe_iter)
nodes = []
for node in pile:
nodes.append(node)
primary_indexes = {n['index'] for n in nodes}
self.assertEqual(5, len(primary_indexes))
self.assertEqual(3, node_iter.primaries_left)
# it's problematic we don't decrement nodes_left until we resume
self.assertEqual(12, node_iter.nodes_left)
for node in node_iter:
nodes.append(node)
self.assertEqual(17, len(nodes))

View File

@@ -26,7 +26,8 @@ import json
from hashlib import md5 from hashlib import md5
import mock import mock
from eventlet import Timeout from eventlet import Timeout, sleep
from eventlet.queue import Empty
import six import six
from six import StringIO from six import StringIO
@@ -2411,6 +2412,249 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
resp = req.get_response(self.app) resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
def test_GET_no_response_error(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect():
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
def test_feed_remaining_primaries(self):
controller = self.controller_cls(
self.app, 'a', 'c', 'o')
safe_iter = utils.GreenthreadSafeIterator(self.app.iter_nodes(
self.policy.object_ring, 0, policy=self.policy))
controller._fragment_GET_request = lambda *a, **k: next(safe_iter)
pile = utils.GreenAsyncPile(self.policy.ec_ndata)
for i in range(self.policy.ec_ndata):
pile.spawn(controller._fragment_GET_request)
req = swob.Request.blank('/v1/a/c/o')
feeder_q = mock.MagicMock()
def feeder_timeout(*a, **kw):
# simulate trampoline
sleep()
# timeout immediately
raise Empty
feeder_q.get.side_effect = feeder_timeout
controller.feed_remaining_primaries(
safe_iter, pile, req, 0, self.policy, mock.MagicMock(), feeder_q)
expected_call = mock.call(timeout=self.app.concurrency_timeout)
expected_num_calls = self.policy.ec_nparity + 1
self.assertEqual(feeder_q.get.call_args_list,
[expected_call] * expected_num_calls)
def test_GET_timeout(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.recoverable_node_timeout = 0.01
codes = [FakeStatus(404, response_sleep=1.0)] + \
[200] * (self.policy.ec_ndata)
with mocked_http_conn(*codes) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(self.policy.ec_ndata + 1, len(log.requests))
def test_GET_with_slow_primaries(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-743]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
status_codes = ([
FakeStatus(200, response_sleep=2.0),
] * self.policy.ec_nparity) + ([
FakeStatus(200),
] * self.policy.ec_ndata)
self.assertEqual(len(status_codes), len(ec_archive_bodies))
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_some_slow_primaries(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-289]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
slow_count = self.policy.ec_nparity
status_codes = ([
FakeStatus(200, response_sleep=2.0),
] * slow_count) + ([
FakeStatus(200),
] * (self.policy.ec_ndata - slow_count))
random.shuffle(status_codes)
status_codes.extend([
FakeStatus(200),
] * slow_count)
self.assertEqual(len(status_codes), len(ec_archive_bodies))
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_slow_nodes_and_failures(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-289]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
unused_resp = [
FakeStatus(200, response_sleep=2.0),
FakeStatus(200, response_sleep=2.0),
500,
416,
]
self.assertEqual(len(unused_resp), self.policy.ec_nparity)
status_codes = (
[200] * (self.policy.ec_ndata - 4)) + unused_resp
self.assertEqual(len(status_codes), self.policy.ec_ndata)
# random.shuffle(status_codes)
# make up for the failures
status_codes.extend([200] * self.policy.ec_nparity)
self.assertEqual(len(status_codes), len(ec_archive_bodies))
bodies_with_errors = []
for code, body in zip(status_codes, ec_archive_bodies):
if code == 500:
bodies_with_errors.append('Kaboom')
elif code == 416:
bodies_with_errors.append('That Range is no.')
else:
bodies_with_errors.append(body)
with mocked_http_conn(*status_codes, body_iter=bodies_with_errors,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_one_slow_frag_lane(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
status_codes = [
FakeStatus(200, response_sleep=2.0),
] + ([
FakeStatus(200),
] * (self.policy.ec_ndata - 1))
random.shuffle(status_codes)
status_codes.extend([
FakeStatus(200, response_sleep=2.0),
FakeStatus(200, response_sleep=2.0),
FakeStatus(200, response_sleep=2.0),
FakeStatus(200),
])
self.assertEqual(len(status_codes), len(ec_archive_bodies))
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_concurrent_ec_extra_requests(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
self.app.concurrent_ec_extra_requests = self.policy.ec_nparity - 1
req = swift.common.swob.Request.blank('/v1/a/c/o')
status_codes = [200] * (self.policy.object_ring.replicas - 1)
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.object_ring.replicas - 1)
self.assertEqual(resp.body, test_data)
def test_GET_with_body(self): def test_GET_with_body(self):
req = swift.common.swob.Request.blank('/v1/a/c/o') req = swift.common.swob.Request.blank('/v1/a/c/o')
# turn a real body into fragments # turn a real body into fragments
@@ -2576,7 +2820,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
fake_response = self._fake_ec_node_response(node_frags) fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o') req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log: with mock.patch('swift.proxy.server.shuffle', lambda n: n), \
capture_http_requests(fake_response) as log:
resp = req.get_response(self.app) resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
@@ -2589,17 +2834,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index']
collected_responses[etag].add(index) collected_responses[etag].add(index)
# because the primary nodes are shuffled, it's possible the proxy self.assertEqual(len(log), self.policy.ec_ndata + 1)
# didn't even notice the missed overwrite frag - but it might have expected = {
self.assertLessEqual(len(log), self.policy.ec_ndata + 1) obj1['etag']: 1,
self.assertLessEqual(len(collected_responses), 2) obj2['etag']: self.policy.ec_ndata,
}
# ... regardless we should never need to fetch more than ec_ndata self.assertEqual(expected, {
# frags for any given etag e: len(f) for e, f in collected_responses.items()})
for etag, frags in collected_responses.items():
self.assertLessEqual(len(frags), self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
def test_GET_with_many_missed_overwrite_will_need_handoff(self): def test_GET_with_many_missed_overwrite_will_need_handoff(self):
obj1 = self._make_ec_object_stub(pattern='obj1') obj1 = self._make_ec_object_stub(pattern='obj1')
@@ -2857,7 +3098,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
collected_indexes[fi].append(conn) collected_indexes[fi].append(conn)
self.assertEqual(len(collected_indexes), 7) self.assertEqual(len(collected_indexes), 7)
def test_GET_with_mixed_nondurable_frags_and_no_quorum_will_503(self): def test_GET_with_mixed_nondurable_frags_and_will_404(self):
# all nodes have a frag but there is no one set that reaches quorum, # all nodes have a frag but there is no one set that reaches quorum,
# which means there is no backend 404 response, but proxy should still # which means there is no backend 404 response, but proxy should still
# return 404 rather than 503 # return 404 rather than 503
@@ -2919,10 +3160,72 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
collected_etags) collected_etags)
self.assertEqual({200}, collected_status) self.assertEqual({200}, collected_status)
def test_GET_with_mixed_frags_and_no_quorum_will_503(self): def test_GET_with_mixed_durable_and_nondurable_frags_will_503(self):
# all nodes have a frag but there is no one set that reaches quorum, # all nodes have a frag but there is no one set that reaches quorum,
# but since they're all marked durable (so we *should* be able to # but since one is marked durable we *should* be able to reconstruct,
# reconstruct), proxy will 503 # so proxy should 503
obj1 = self._make_ec_object_stub(pattern='obj1')
obj2 = self._make_ec_object_stub(pattern='obj2')
obj3 = self._make_ec_object_stub(pattern='obj3')
obj4 = self._make_ec_object_stub(pattern='obj4')
node_frags = [
{'obj': obj1, 'frag': 0, 'durable': False},
{'obj': obj2, 'frag': 0, 'durable': False},
{'obj': obj3, 'frag': 0, 'durable': False},
{'obj': obj1, 'frag': 1, 'durable': False},
{'obj': obj2, 'frag': 1, 'durable': False},
{'obj': obj3, 'frag': 1, 'durable': False},
{'obj': obj1, 'frag': 2, 'durable': False},
{'obj': obj2, 'frag': 2, 'durable': False},
{'obj': obj3, 'frag': 2, 'durable': False},
{'obj': obj1, 'frag': 3, 'durable': False},
{'obj': obj2, 'frag': 3, 'durable': False},
{'obj': obj3, 'frag': 3, 'durable': False},
{'obj': obj1, 'frag': 4, 'durable': False},
{'obj': obj2, 'frag': 4, 'durable': False},
{'obj': obj3, 'frag': 4, 'durable': False},
{'obj': obj1, 'frag': 5, 'durable': False},
{'obj': obj2, 'frag': 5, 'durable': False},
{'obj': obj3, 'frag': 5, 'durable': False},
{'obj': obj1, 'frag': 6, 'durable': False},
{'obj': obj2, 'frag': 6, 'durable': False},
{'obj': obj3, 'frag': 6, 'durable': False},
{'obj': obj1, 'frag': 7, 'durable': False},
{'obj': obj2, 'frag': 7, 'durable': False},
{'obj': obj3, 'frag': 7},
{'obj': obj1, 'frag': 8, 'durable': False},
{'obj': obj2, 'frag': 8, 'durable': False},
{'obj': obj3, 'frag': 8, 'durable': False},
{'obj': obj4, 'frag': 8, 'durable': False},
]
fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
collected_etags = set()
collected_status = set()
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
collected_etags.add(etag)
collected_status.add(conn.resp.status)
# default node_iter will exhaust at 2 * replicas
self.assertEqual(len(log), 2 * self.replicas())
self.assertEqual(
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
collected_etags)
self.assertEqual({200}, collected_status)
def test_GET_with_mixed_durable_frags_and_no_quorum_will_503(self):
# all nodes have a frag but there is no one set that reaches quorum,
# and since at least one is marked durable we *should* be able to
# reconstruct, so proxy will 503
obj1 = self._make_ec_object_stub(pattern='obj1') obj1 = self._make_ec_object_stub(pattern='obj1')
obj2 = self._make_ec_object_stub(pattern='obj2') obj2 = self._make_ec_object_stub(pattern='obj2')
obj3 = self._make_ec_object_stub(pattern='obj3') obj3 = self._make_ec_object_stub(pattern='obj3')
@@ -3001,7 +3304,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
{'obj': obj1, 'frag': 11, 'durable': False}, # parity {'obj': obj1, 'frag': 11, 'durable': False}, # parity
{'obj': obj1, 'frag': 12, 'durable': False}, # parity {'obj': obj1, 'frag': 12, 'durable': False}, # parity
{'obj': obj1, 'frag': 13, 'durable': False}, # parity {'obj': obj1, 'frag': 13, 'durable': False}, # parity
] # handoffs not used in this scenario ] + [[]] * self.replicas() # handoffs all 404
fake_response = self._fake_ec_node_response(list(node_frags)) fake_response = self._fake_ec_node_response(list(node_frags))
@@ -3013,9 +3316,11 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.headers['etag'], obj1['etag']) self.assertEqual(resp.headers['etag'], obj1['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) self.assertEqual(md5(resp.body).hexdigest(), obj1['etag'])
self.assertEqual(self.policy.ec_ndata, len(log)) self.assertGreaterEqual(len(log), self.policy.ec_ndata)
collected_durables = [] collected_durables = []
for conn in log: for conn in log:
if not conn.resp.headers.get('X-Backend-Data-Timestamp'):
continue
if (conn.resp.headers.get('X-Backend-Durable-Timestamp') if (conn.resp.headers.get('X-Backend-Durable-Timestamp')
== conn.resp.headers.get('X-Backend-Data-Timestamp')): == conn.resp.headers.get('X-Backend-Data-Timestamp')):
collected_durables.append(conn) collected_durables.append(conn)
@@ -3044,7 +3349,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
{'obj': obj1, 'frag': 11, 'durable': False}, # parity {'obj': obj1, 'frag': 11, 'durable': False}, # parity
{'obj': obj1, 'frag': 12, 'durable': False}, # parity {'obj': obj1, 'frag': 12, 'durable': False}, # parity
{'obj': obj1, 'frag': 13, 'durable': False}, # parity {'obj': obj1, 'frag': 13, 'durable': False}, # parity
] # handoffs not used in this scenario ] + [[]] * self.replicas() # handoffs all 404
fake_response = self._fake_ec_node_response(list(node_frags)) fake_response = self._fake_ec_node_response(list(node_frags))
@@ -3058,6 +3363,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
collected_durables = [] collected_durables = []
for conn in log: for conn in log:
if not conn.resp.headers.get('X-Backend-Data-Timestamp'):
continue
if (conn.resp.headers.get('X-Backend-Durable-Timestamp') if (conn.resp.headers.get('X-Backend-Durable-Timestamp')
== conn.resp.headers.get('X-Backend-Data-Timestamp')): == conn.resp.headers.get('X-Backend-Data-Timestamp')):
collected_durables.append(conn) collected_durables.append(conn)
@@ -3231,6 +3538,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
# min: proxy will GET 10 non-durable obj1 frags and then 10 obj frags # min: proxy will GET 10 non-durable obj1 frags and then 10 obj frags
self.assertGreaterEqual(len(log), 2 * self.policy.ec_ndata) self.assertGreaterEqual(len(log), 2 * self.policy.ec_ndata)
def test_GET_with_missing_durables_and_older_obscured_durables(self):
# scenario: obj3 has 14 frags but only 2 are durable and these are # scenario: obj3 has 14 frags but only 2 are durable and these are
# obscured by two non-durable frags of obj1. There is also a single # obscured by two non-durable frags of obj1. There is also a single
# non-durable frag of obj2. The proxy will need to do at least 10 # non-durable frag of obj2. The proxy will need to do at least 10
@@ -3259,7 +3567,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
[{'obj': obj3, 'frag': 11, 'durable': False}], [{'obj': obj3, 'frag': 11, 'durable': False}],
[{'obj': obj3, 'frag': 12, 'durable': False}], [{'obj': obj3, 'frag': 12, 'durable': False}],
[{'obj': obj3, 'frag': 13, 'durable': False}], [{'obj': obj3, 'frag': 13, 'durable': False}],
] ] + [[]] * self.replicas() # handoffs 404
fake_response = self._fake_ec_node_response(list(node_frags)) fake_response = self._fake_ec_node_response(list(node_frags))
@@ -3271,7 +3579,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.headers['etag'], obj3['etag']) self.assertEqual(resp.headers['etag'], obj3['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj3['etag']) self.assertEqual(md5(resp.body).hexdigest(), obj3['etag'])
self.assertGreaterEqual(len(log), self.policy.ec_ndata + 1) self.assertGreaterEqual(len(log), self.policy.ec_ndata + 1)
self.assertLessEqual(len(log), self.policy.ec_ndata + 4) self.assertLessEqual(len(log), (self.policy.ec_ndata * 2) + 1)
def test_GET_with_missing_durables_and_older_non_durables(self): def test_GET_with_missing_durables_and_older_non_durables(self):
# scenario: non-durable frags of newer obj1 obscure all frags # scenario: non-durable frags of newer obj1 obscure all frags
@@ -3453,7 +3761,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
StubResponse(416, frag_index=4), StubResponse(416, frag_index=4),
StubResponse(416, frag_index=5), StubResponse(416, frag_index=5),
StubResponse(416, frag_index=6), StubResponse(416, frag_index=6),
# sneak in bogus extra responses # sneak a couple bogus extra responses
StubResponse(404), StubResponse(404),
StubResponse(206, frag_index=8), StubResponse(206, frag_index=8),
# and then just "enough" more 416's # and then just "enough" more 416's
@@ -3471,8 +3779,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
resp = req.get_response(self.app) resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 416) self.assertEqual(resp.status_int, 416)
# ec_ndata responses that must agree, plus the bogus extras # we're going to engage ndata primaries, plus the bogus extra
self.assertEqual(len(log), self.policy.ec_ndata + 2) # self.assertEqual(len(log), self.policy.ec_ndata + 2)
self.assertEqual([c.resp.status for c in log],
([416] * 7) + [404, 206] + ([416] * 3))
def test_GET_with_missing_and_range_unsatisifiable(self): def test_GET_with_missing_and_range_unsatisifiable(self):
responses = [ # not quite ec_ndata frags on primaries responses = [ # not quite ec_ndata frags on primaries
@@ -3700,7 +4010,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
status_codes, body_iter, headers = zip(*responses + [ status_codes, body_iter, headers = zip(*responses + [
(404, [b''], {}) for i in range( (404, [b''], {}) for i in range(
self.policy.object_ring.max_more_nodes)]) self.policy.object_ring.max_more_nodes)])
with set_http_connect(*status_codes, body_iter=body_iter, with mocked_http_conn(*status_codes, body_iter=body_iter,
headers=headers): headers=headers):
resp = req.get_response(self.app) resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
@@ -3708,8 +4018,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
# resume but won't be able to give us all the right bytes # resume but won't be able to give us all the right bytes
self.assertNotEqual(md5(resp.body).hexdigest(), etag) self.assertNotEqual(md5(resp.body).hexdigest(), etag)
error_lines = self.logger.get_lines_for_level('error') error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(self.replicas(), len(error_lines))
nparity = self.policy.ec_nparity nparity = self.policy.ec_nparity
self.assertGreater(len(error_lines), nparity)
for line in error_lines[:nparity]: for line in error_lines[:nparity]:
self.assertIn('retrying', line) self.assertIn('retrying', line)
for line in error_lines[nparity:]: for line in error_lines[nparity:]:
@@ -3720,7 +4030,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
test_data = (b'test' * segment_size)[:-333] test_data = (b'test' * segment_size)[:-333]
etag = md5(test_data).hexdigest() etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data) ec_archive_bodies = self._make_ec_archive_bodies(test_data)
headers = {'X-Object-Sysmeta-Ec-Etag': etag} headers = {
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(test_data),
}
self.app.recoverable_node_timeout = 0.05 self.app.recoverable_node_timeout = 0.05
# first one is slow # first one is slow
responses = [(200, SlowBody(ec_archive_bodies[0], 0.1), responses = [(200, SlowBody(ec_archive_bodies[0], 0.1),
@@ -3737,11 +4050,100 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
headers=headers): headers=headers):
resp = req.get_response(self.app) resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertTrue(md5(resp.body).hexdigest(), etag) self.assertEqual(md5(resp.body).hexdigest(), etag)
error_lines = self.logger.get_lines_for_level('error') error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines)) self.assertEqual(1, len(error_lines))
self.assertIn('retrying', error_lines[0]) self.assertIn('retrying', error_lines[0])
def test_GET_read_timeout_resume_mixed_etag(self):
segment_size = self.policy.ec_segment_size
test_data2 = (b'blah1' * segment_size)[:-333]
test_data1 = (b'test' * segment_size)[:-333]
etag2 = md5(test_data2).hexdigest()
etag1 = md5(test_data1).hexdigest()
ec_archive_bodies2 = self._make_ec_archive_bodies(test_data2)
ec_archive_bodies1 = self._make_ec_archive_bodies(test_data1)
headers2 = {'X-Object-Sysmeta-Ec-Etag': etag2,
'X-Object-Sysmeta-Ec-Content-Length': len(test_data2),
'X-Backend-Timestamp': self.ts().internal}
headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1,
'X-Object-Sysmeta-Ec-Content-Length': len(test_data1),
'X-Backend-Timestamp': self.ts().internal}
responses = [
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[1], self._add_frag_index(1, headers1)),
# 404
(404, [b''], {}),
# etag1
(200, SlowBody(ec_archive_bodies1[3], 0.1), self._add_frag_index(
3, headers1)),
# etag2
(200, ec_archive_bodies2[4], self._add_frag_index(4, headers2)),
# etag1
(200, ec_archive_bodies1[5], self._add_frag_index(5, headers1)),
# etag2
(200, ec_archive_bodies2[6], self._add_frag_index(6, headers2)),
# etag1
(200, ec_archive_bodies1[7], self._add_frag_index(7, headers1)),
# etag2
(200, ec_archive_bodies2[8], self._add_frag_index(8, headers2)),
# etag1
(200, SlowBody(ec_archive_bodies1[9], 0.1), self._add_frag_index(
9, headers1)),
# etag2
(200, ec_archive_bodies2[10], self._add_frag_index(10, headers2)),
# etag1
(200, ec_archive_bodies1[11], self._add_frag_index(11, headers1)),
# etag2
(200, ec_archive_bodies2[12], self._add_frag_index(12, headers2)),
# 404
(404, [b''], {}),
# handoffs start here
# etag2
(200, ec_archive_bodies2[0], self._add_frag_index(0, headers2)),
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[2], self._add_frag_index(2, headers1)),
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[4], self._add_frag_index(4, headers1)),
# etag2
(200, ec_archive_bodies2[1], self._add_frag_index(1, headers2)),
# etag1
(200, ec_archive_bodies1[6], self._add_frag_index(6, headers1)),
# etag2
(200, ec_archive_bodies2[7], self._add_frag_index(7, headers2)),
# etag1
(200, ec_archive_bodies1[8], self._add_frag_index(8, headers1)),
# resume requests start here
# 404
(404, [b''], {}),
# etag2
(200, ec_archive_bodies2[3], self._add_frag_index(3, headers2)),
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[10], self._add_frag_index(10, headers1)),
# etag1
(200, ec_archive_bodies1[12], self._add_frag_index(12, headers1)),
]
self.app.recoverable_node_timeout = 0.01
req = swob.Request.blank('/v1/a/c/o')
status_codes, body_iter, headers = zip(*responses)
with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(md5(resp.body).hexdigest(), etag1)
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(2, len(error_lines))
for line in error_lines:
self.assertIn('retrying', line)
def test_fix_response_HEAD(self): def test_fix_response_HEAD(self):
headers = {'X-Object-Sysmeta-Ec-Content-Length': '10', headers = {'X-Object-Sysmeta-Ec-Content-Length': '10',
'X-Object-Sysmeta-Ec-Etag': 'foo'} 'X-Object-Sysmeta-Ec-Etag': 'foo'}
@@ -3825,6 +4227,43 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.etag, body_etag) self.assertEqual(resp.etag, body_etag)
self.assertEqual(resp.headers['Accept-Ranges'], 'bytes') self.assertEqual(resp.headers['Accept-Ranges'], 'bytes')
def test_non_durable_ec_response_bucket(self):
ts = self.ts()
bucket = obj.ECGetResponseBucket(self.policy, ts)
self.assertEqual(bucket.shortfall, self.policy.ec_ndata)
for i in range(1, self.policy.ec_ndata - self.policy.ec_nparity + 1):
stub_getter = mock.MagicMock(last_status=200, last_headers={
'X-Backend-Timestamp': ts.internal,
'X-Object-Sysmeta-Ec-Etag': 'the-etag',
'X-Object-Sysmeta-Ec-Frag-Index': str(i),
})
bucket.add_response(stub_getter, None)
self.assertEqual(bucket.shortfall, self.policy.ec_ndata - i)
self.assertEqual(bucket.shortfall, self.policy.ec_nparity)
self.assertFalse(bucket.durable)
expectations = (
4, # 7
4, # 8
4, # 9
4, # 10
3, # 11
2, # 12
1, # 13
1, # 14
)
for i, expected in zip(range(
self.policy.ec_ndata - self.policy.ec_nparity + 1,
self.policy.object_ring.replica_count + 1), expectations):
stub_getter = mock.MagicMock(last_status=200, last_headers={
'X-Backend-Timestamp': ts.internal,
'X-Object-Sysmeta-Ec-Etag': 'the-etag',
'X-Object-Sysmeta-Ec-Frag-Index': str(i),
})
bucket.add_response(stub_getter, None)
msg = 'With %r resp, expected shortfall %s != %s' % (
bucket.gets.keys(), expected, bucket.shortfall)
self.assertEqual(bucket.shortfall, expected, msg)
class TestECFunctions(unittest.TestCase): class TestECFunctions(unittest.TestCase):
def test_chunk_transformer(self): def test_chunk_transformer(self):
@@ -3969,7 +4408,8 @@ class TestECDuplicationObjController(
# the backend requests will stop at enough ec_ndata responses # the backend requests will stop at enough ec_ndata responses
self.assertEqual( self.assertEqual(
len(frags), self.policy.ec_ndata, len(frags), self.policy.ec_ndata,
'collected %s frags for etag %s' % (len(frags), etag)) 'collected %s frags (expected %s) for etag %s' % (
len(frags), self.policy.ec_ndata, etag))
# TODO: actually "frags" in node_frags is meaning "node_index" right now # TODO: actually "frags" in node_frags is meaning "node_index" right now
# in following tests. Reconsidering the name and semantics change needed. # in following tests. Reconsidering the name and semantics change needed.

View File

@@ -1337,6 +1337,20 @@ class TestProxyServerLoading(unittest.TestCase):
self.assertEqual(app.post_quorum_timeout, 0.3) self.assertEqual(app.post_quorum_timeout, 0.3)
self.assertEqual(app.concurrency_timeout, 0.2) self.assertEqual(app.concurrency_timeout, 0.2)
def test_concurrent_ec_options(self):
conf = {
'concurrent_gets': 'on',
'concurrency_timeout': '0.5',
'concurrent_ec_extra_requests': '4',
}
for policy in POLICIES:
policy.object_ring = FakeRing()
app = proxy_server.Application(conf, debug_logger(),
FakeRing(), FakeRing())
self.assertEqual(app.concurrent_ec_extra_requests, 4)
self.assertEqual(app.concurrent_gets, True)
self.assertEqual(app.concurrency_timeout, 0.5)
def test_load_policy_rings(self): def test_load_policy_rings(self):
for policy in POLICIES: for policy in POLICIES:
self.assertFalse(policy.object_ring) self.assertFalse(policy.object_ring)
@@ -4687,12 +4701,21 @@ class TestReplicatedObjectController(
object_ring.max_more_nodes = 0 object_ring.max_more_nodes = 0
def test_iter_nodes_calls_sort_nodes(self): def test_iter_nodes_calls_sort_nodes(self):
with mock.patch.object(self.app, 'sort_nodes') as sort_nodes: called = []
def fake_sort_nodes(nodes, **kwargs):
# caller might mutate the list we return during iteration, we're
# interested in the value as of call time
called.append(mock.call(list(nodes), **kwargs))
return nodes
with mock.patch.object(self.app, 'sort_nodes',
side_effect=fake_sort_nodes):
object_ring = self.app.get_object_ring(None) object_ring = self.app.get_object_ring(None)
for node in self.app.iter_nodes(object_ring, 0): for node in self.app.iter_nodes(object_ring, 0):
pass pass
sort_nodes.assert_called_once_with( self.assertEqual(called, [
object_ring.get_part_nodes(0), policy=None) mock.call(object_ring.get_part_nodes(0), policy=None)
])
def test_iter_nodes_skips_error_limited(self): def test_iter_nodes_skips_error_limited(self):
with mock.patch.object(self.app, 'sort_nodes', with mock.patch.object(self.app, 'sort_nodes',