Merge "Extend concurrent_gets to EC GET requests"

This commit is contained in:
Zuul 2020-09-04 14:22:19 +00:00 committed by Gerrit Code Review
commit 20e1544ad8
7 changed files with 875 additions and 336 deletions

View File

@ -200,12 +200,14 @@ use = egg:swift#proxy
# the number of seconds configured by timing_expiry.
# timing_expiry = 300
#
# By default on a GET/HEAD swift will connect to a storage node one at a time
# in a single thread. There is smarts in the order they are hit however. If you
# turn on concurrent_gets below, then replica count threads will be used.
# With addition of the concurrency_timeout option this will allow swift to send
# out GET/HEAD requests to the storage nodes concurrently and answer with the
# first to respond. With an EC policy the parameter only affects HEAD requests.
# By default on a GET/HEAD swift will connect to a minimum number storage nodes
# in a minimum number of threads - for replicated data just a single request to
# a single node one at a time. When enabled concurrent_gets allows the proxy,
# to use up to replica count threads when waiting on a response. In
# conjunction with the concurrency_timeout option this will allow swift to send
# out GET/HEAD requests to the storage nodes concurrently and answer as soon as
# the minimum number of backend responses are availabe - in replicated contexts
# this will be the first backend replica to respond.
# concurrent_gets = off
#
# This parameter controls how long to wait before firing off the next
@ -215,6 +217,13 @@ use = egg:swift#proxy
# conn_timeout parameter.
# concurrency_timeout = 0.5
#
# By default on a EC GET request swift will connect to a minimum number of
# storage nodes in a minimum number of threads - for erasure coded data, ndata
# requests to primary nodes are started at the same time. When greater than
# zero this option provides additional robustness and may reduce first byte
# latency by starting additional requests - up to as many as nparity.
# concurrent_ec_extra_requests = 0
#
# Set to the number of nodes to contact for a normal request. You can use
# '* replicas' at the end to have it use the number given times the number of
# replicas for the ring being used for the request.

View File

@ -1488,18 +1488,22 @@ class NodeIter(object):
if node_iter is None:
node_iter = itertools.chain(
part_nodes, ring.get_more_nodes(partition))
num_primary_nodes = len(part_nodes)
self.nodes_left = self.app.request_node_count(num_primary_nodes)
self.expected_handoffs = self.nodes_left - num_primary_nodes
self.num_primary_nodes = len(part_nodes)
self.nodes_left = self.app.request_node_count(self.num_primary_nodes)
self.expected_handoffs = self.nodes_left - self.num_primary_nodes
# Use of list() here forcibly yanks the first N nodes (the primary
# nodes) from node_iter, so the rest of its values are handoffs.
self.primary_nodes = self.app.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes)),
list(itertools.islice(node_iter, self.num_primary_nodes)),
policy=policy)
self.handoff_iter = node_iter
self._node_provider = None
@property
def primaries_left(self):
return len(self.primary_nodes)
def __iter__(self):
self._node_iter = self._node_gen()
return self
@ -1523,7 +1527,7 @@ class NodeIter(object):
self.app.logger.increment('handoff_count')
self.app.logger.warning(
'Handoff requested (%d)' % handoffs)
if (extra_handoffs == len(self.primary_nodes)):
if (extra_handoffs == self.num_primary_nodes):
# all the primaries were skipped, and handoffs didn't help
self.app.logger.increment('handoff_all_count')
@ -1539,7 +1543,8 @@ class NodeIter(object):
self._node_provider = callback
def _node_gen(self):
for node in self.primary_nodes:
while self.primary_nodes:
node = self.primary_nodes.pop(0)
if not self.app.error_limited(node):
yield node
if not self.app.error_limited(node):

View File

@ -40,7 +40,7 @@ import sys
from greenlet import GreenletExit
from eventlet import GreenPile, sleep
from eventlet.queue import Queue
from eventlet.queue import Queue, Empty
from eventlet.timeout import Timeout
from swift import gettext_ as _
@ -68,7 +68,7 @@ from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
ECDriverError, PolicyError)
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, update_headers, bytes_to_skip, close_swift_conn, \
ByteCountEnforcer, source_key
ByteCountEnforcer
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \
@ -1400,7 +1400,7 @@ class ECAppIter(object):
# killed by contextpool
pass
except ChunkReadTimeout:
# unable to resume in GetOrHeadHandler
# unable to resume in ECFragGetter
self.logger.exception(_("Timeout fetching fragments for %r"),
quote(self.path))
except: # noqa
@ -1984,13 +1984,13 @@ class ECGetResponseBucket(object):
A helper class to encapsulate the properties of buckets in which fragment
getters and alternate nodes are collected.
"""
def __init__(self, policy, timestamp_str):
def __init__(self, policy, timestamp):
"""
:param policy: an instance of ECStoragePolicy
:param timestamp_str: a string representation of a timestamp
:param timestamp: a Timestamp, or None for a bucket of error reponses
"""
self.policy = policy
self.timestamp_str = timestamp_str
self.timestamp = timestamp
self.gets = collections.defaultdict(list)
self.alt_nodes = collections.defaultdict(list)
self._durable = False
@ -2004,10 +2004,20 @@ class ECGetResponseBucket(object):
return self._durable
def add_response(self, getter, parts_iter):
"""
Add another response to this bucket. Response buckets can be for
fragments with the same timestamp, or for errors with the same status.
"""
headers = getter.last_headers
timestamp_str = headers.get('X-Backend-Timestamp',
headers.get('X-Timestamp'))
if timestamp_str:
# 404s will keep the most recent timestamp
self.timestamp = max(Timestamp(timestamp_str), self.timestamp)
if not self.gets:
self.status = getter.last_status
# stash first set of backend headers, which will be used to
# populate a client response
self.status = getter.last_status
# TODO: each bucket is for a single *data* timestamp, but sources
# in the same bucket may have different *metadata* timestamps if
# some backends have more recent .meta files than others. Currently
@ -2017,18 +2027,17 @@ class ECGetResponseBucket(object):
# recent metadata. We could alternatively choose to the *newest*
# metadata headers for self.headers by selecting the source with
# the latest X-Timestamp.
self.headers = getter.last_headers
elif (self.timestamp_str is not None and # ie, not bad_bucket
getter.last_headers.get('X-Object-Sysmeta-Ec-Etag') !=
self.headers.get('X-Object-Sysmeta-Ec-Etag')):
self.headers = headers
elif headers.get('X-Object-Sysmeta-Ec-Etag') != \
self.headers.get('X-Object-Sysmeta-Ec-Etag'):
# Fragments at the same timestamp with different etags are never
# expected. If somehow it happens then ignore those fragments
# to avoid mixing fragments that will not reconstruct otherwise
# an exception from pyeclib is almost certain. This strategy leaves
# a possibility that a set of consistent frags will be gathered.
# expected and error buckets shouldn't have this header. If somehow
# this happens then ignore those responses to avoid mixing
# fragments that will not reconstruct otherwise an exception from
# pyeclib is almost certain.
raise ValueError("ETag mismatch")
frag_index = getter.last_headers.get('X-Object-Sysmeta-Ec-Frag-Index')
frag_index = headers.get('X-Object-Sysmeta-Ec-Frag-Index')
frag_index = int(frag_index) if frag_index is not None else None
self.gets[frag_index].append((getter, parts_iter))
@ -2056,8 +2065,19 @@ class ECGetResponseBucket(object):
@property
def shortfall(self):
result = self.policy.ec_ndata - len(self.get_responses())
return max(result, 0)
"""
The number of additional responses needed to complete this bucket;
typically (ndata - resp_count).
If the bucket has no durable responses, shortfall is extended out to
replica count to ensure the proxy makes additional primary requests.
"""
resp_count = len(self.get_responses())
if self.durable or self.status == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE:
return max(self.policy.ec_ndata - resp_count, 0)
alt_count = min(self.policy.object_ring.replica_count - resp_count,
self.policy.ec_nparity)
return max([1, self.policy.ec_ndata - resp_count, alt_count])
@property
def shortfall_with_alts(self):
@ -2070,7 +2090,7 @@ class ECGetResponseBucket(object):
def __str__(self):
# return a string summarising bucket state, useful for debugging.
return '<%s, %s, %s, %s(%s), %s>' \
% (self.timestamp_str, self.status, self._durable,
% (self.timestamp.internal, self.status, self._durable,
self.shortfall, self.shortfall_with_alts, len(self.gets))
@ -2092,15 +2112,24 @@ class ECGetResponseCollection(object):
"""
self.policy = policy
self.buckets = {}
self.bad_buckets = {None: ECGetResponseBucket(self.policy, None)}
self.node_iter_count = 0
def _get_bucket(self, timestamp_str):
def _get_bucket(self, timestamp):
"""
:param timestamp_str: a string representation of a timestamp
:param timestamp: a Timestamp
:return: ECGetResponseBucket for given timestamp
"""
return self.buckets.setdefault(
timestamp_str, ECGetResponseBucket(self.policy, timestamp_str))
timestamp, ECGetResponseBucket(self.policy, timestamp))
def _get_bad_bucket(self, status):
"""
:param status: a representation of status
:return: ECGetResponseBucket for given status
"""
return self.bad_buckets.setdefault(
status, ECGetResponseBucket(self.policy, None))
def add_response(self, get, parts_iter):
"""
@ -2112,13 +2141,31 @@ class ECGetResponseCollection(object):
:raises ValueError: if the response etag or status code values do not
match any values previously received for the same timestamp
"""
if is_success(get.last_status):
self.add_good_response(get, parts_iter)
else:
self.add_bad_resp(get, parts_iter)
def add_bad_resp(self, get, parts_iter):
bad_bucket = self._get_bad_bucket(get.last_status)
bad_bucket.add_response(get, parts_iter)
def add_good_response(self, get, parts_iter):
headers = get.last_headers
# Add the response to the appropriate bucket keyed by data file
# timestamp. Fall back to using X-Backend-Timestamp as key for object
# servers that have not been upgraded.
t_data_file = headers.get('X-Backend-Data-Timestamp')
t_obj = headers.get('X-Backend-Timestamp', headers.get('X-Timestamp'))
self._get_bucket(t_data_file or t_obj).add_response(get, parts_iter)
if t_data_file:
timestamp = Timestamp(t_data_file)
elif t_obj:
timestamp = Timestamp(t_obj)
else:
# Don't think this should ever come up in practice,
# but tests cover it
timestamp = None
self._get_bucket(timestamp).add_response(get, parts_iter)
# The node may also have alternate fragments indexes (possibly at
# different timestamps). For each list of alternate fragments indexes,
@ -2126,6 +2173,7 @@ class ECGetResponseCollection(object):
# list to that bucket's alternate nodes.
frag_sets = safe_json_loads(headers.get('X-Backend-Fragments')) or {}
for t_frag, frag_set in frag_sets.items():
t_frag = Timestamp(t_frag)
self._get_bucket(t_frag).add_alternate_nodes(get.node, frag_set)
# If the response includes a durable timestamp then mark that bucket as
# durable. Note that this may be a different bucket than the one this
@ -2137,7 +2185,7 @@ class ECGetResponseCollection(object):
# obj server not upgraded so assume this response's frag is durable
t_durable = t_obj
if t_durable:
self._get_bucket(t_durable).set_durable()
self._get_bucket(Timestamp(t_durable)).set_durable()
def _sort_buckets(self):
def key_fn(bucket):
@ -2150,35 +2198,65 @@ class ECGetResponseCollection(object):
return (bucket.durable,
bucket.shortfall <= 0,
-1 * bucket.shortfall_with_alts,
bucket.timestamp_str)
bucket.timestamp)
return sorted(self.buckets.values(), key=key_fn, reverse=True)
@property
def best_bucket(self):
"""
Return the best bucket in the collection.
Return the "best" bucket in the collection.
The "best" bucket is the newest timestamp with sufficient getters, or
the closest to having sufficient getters, unless it is bettered by a
bucket with potential alternate nodes.
If there are no good buckets we return the "least_bad" bucket.
:return: An instance of :class:`~ECGetResponseBucket` or None if there
are no buckets in the collection.
"""
sorted_buckets = self._sort_buckets()
if sorted_buckets:
return sorted_buckets[0]
return None
for bucket in sorted_buckets:
# tombstones will set bad_bucket.timestamp
not_found_bucket = self.bad_buckets.get(404)
if not_found_bucket and not_found_bucket.timestamp and \
bucket.timestamp < not_found_bucket.timestamp:
# "good bucket" is trumped by newer tombstone
continue
return bucket
return self.least_bad_bucket
@property
def least_bad_bucket(self):
"""
Return the bad_bucket with the smallest shortfall
"""
# we want "enough" 416s to prevent "extra" requests - but we keep
# digging on 404s
short, status = min((bucket.shortfall, status)
for status, bucket in self.bad_buckets.items()
if status != 404)
return self.bad_buckets[status]
@property
def shortfall(self):
best_bucket = self.best_bucket
shortfall = best_bucket.shortfall
return min(shortfall, self.least_bad_bucket.shortfall)
@property
def durable(self):
return self.best_bucket.durable
def _get_frag_prefs(self):
# Construct the current frag_prefs list, with best_bucket prefs first.
frag_prefs = []
for bucket in self._sort_buckets():
if bucket.timestamp_str:
if bucket.timestamp:
exclusions = [fi for fi in bucket.gets if fi is not None]
prefs = {'timestamp': bucket.timestamp_str,
prefs = {'timestamp': bucket.timestamp.internal,
'exclude': exclusions}
frag_prefs.append(prefs)
@ -2237,44 +2315,34 @@ class ECGetResponseCollection(object):
return nodes.pop(0).copy()
def is_good_source(status):
"""
Indicates whether or not the request made to the backend found
what it was looking for.
:param status: the response from the backend
:returns: True if found, False if not
"""
if status == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE:
return True
return is_success(status) or is_redirection(status)
class ECFragGetter(object):
def __init__(self, app, req, server_type, node_iter, partition, path,
backend_headers, concurrency=1, client_chunk_size=None,
newest=None, header_provider=None):
def __init__(self, app, req, node_iter, partition, policy, path,
backend_headers, header_provider=None):
self.app = app
self.req = req
self.node_iter = node_iter
self.server_type = server_type
self.partition = partition
self.path = path
self.backend_headers = backend_headers
self.client_chunk_size = client_chunk_size
self.header_provider = header_provider
self.req_query_string = req.query_string
self.client_chunk_size = policy.fragment_size
self.skip_bytes = 0
self.bytes_used_from_backend = 0
self.used_nodes = []
self.used_source_etag = ''
self.concurrency = concurrency
self.node = None
self.header_provider = header_provider
self.latest_404_timestamp = Timestamp(0)
# stuff from request
self.req_method = req.method
self.req_path = req.path
self.req_query_string = req.query_string
if newest is None:
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
else:
self.newest = newest
# populated when finding source
self.statuses = []
self.reasons = []
self.bodies = []
self.source_headers = []
self.sources = []
# populated from response headers
self.start_byte = self.end_byte = self.length = None
def fast_forward(self, num_bytes):
"""
@ -2382,20 +2450,11 @@ class ECFragGetter(object):
e if e is not None else '')
for s, e in new_ranges)))
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
what it was looking for.
:param src: the response from the backend
:returns: True if found, False if not
"""
if self.server_type == 'Object' and src.status == 416:
return True
return is_success(src.status) or is_redirection(src.status)
def response_parts_iter(self, req):
source, node = self._get_source_and_node()
try:
source, node = next(self.source_and_node_iter)
except StopIteration:
return
it = None
if source:
it = self._get_response_parts_iter(req, node, source)
@ -2408,9 +2467,7 @@ class ECFragGetter(object):
try:
client_chunk_size = self.client_chunk_size
node_timeout = self.app.node_timeout
if self.server_type == 'Object':
node_timeout = self.app.recoverable_node_timeout
node_timeout = self.app.recoverable_node_timeout
# This is safe; it sets up a generator but does not call next()
# on it, so no IO is performed.
@ -2437,7 +2494,7 @@ class ECFragGetter(object):
parts_iter[0])
return (start_byte, end_byte, length, headers, part)
except ChunkReadTimeout:
new_source, new_node = self._get_source_and_node()
new_source, new_node = self._dig_for_source_and_node()
if new_source:
self.app.error_occurred(
node[0], _('Trying to read object during '
@ -2472,8 +2529,6 @@ class ECFragGetter(object):
nbytes -= len(chunk)
except (ChunkReadTimeout, ShortReadError):
exc_type, exc_value, exc_traceback = sys.exc_info()
if self.newest or self.server_type != 'Object':
raise
try:
self.fast_forward(self.bytes_used_from_backend)
except (HTTPException, ValueError):
@ -2481,7 +2536,7 @@ class ECFragGetter(object):
except RangeAlreadyComplete:
break
buf = b''
new_source, new_node = self._get_source_and_node()
new_source, new_node = self._dig_for_source_and_node()
if new_source:
self.app.error_occurred(
node[0], _('Trying to read object during '
@ -2627,33 +2682,26 @@ class ECFragGetter(object):
@property
def last_status(self):
if self.statuses:
return self.statuses[-1]
else:
return None
return self.status or HTTP_INTERNAL_SERVER_ERROR
@property
def last_headers(self):
if self.source_headers:
return HeaderKeyDict(self.source_headers[-1])
return HeaderKeyDict(self.source_headers)
else:
return None
return HeaderKeyDict()
def _make_node_request(self, node, node_timeout, logger_thread_locals):
self.app.logger.thread_locals = logger_thread_locals
if node in self.used_nodes:
return False
req_headers = dict(self.backend_headers)
# a request may be specialised with specific backend headers
if self.header_provider:
req_headers.update(self.header_provider())
ip, port = get_ip_port(node, req_headers)
req_headers.update(self.header_provider())
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(
ip, port, node['device'],
self.partition, self.req_method, self.path,
self.partition, 'GET', self.path,
headers=req_headers,
query_string=self.req_query_string)
self.app.set_node_timing(node, time.time() - start_node_timing)
@ -2664,134 +2712,69 @@ class ECFragGetter(object):
possible_source.swift_conn = conn
except (Exception, Timeout):
self.app.exception_occurred(
node, self.server_type,
node, 'Object',
_('Trying to %(method)s %(path)s') %
{'method': self.req_method, 'path': self.req_path})
return False
{'method': self.req.method, 'path': self.req.path})
return None
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
if self.is_good_source(possible_source):
# 404 if we know we don't have a synced copy
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
close_swift_conn(possible_source)
else:
if self.used_source_etag and \
self.used_source_etag != normalize_etag(src_headers.get(
'x-object-sysmeta-ec-etag',
src_headers.get('etag', ''))):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append([])
return False
# a possible source should only be added as a valid source
# if its timestamp is newer than previously found tombstones
ps_timestamp = Timestamp(
src_headers.get('x-backend-data-timestamp') or
src_headers.get('x-backend-timestamp') or
src_headers.get('x-put-timestamp') or
src_headers.get('x-timestamp') or 0)
if ps_timestamp >= self.latest_404_timestamp:
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(None)
self.source_headers.append(possible_source.getheaders())
self.sources.append((possible_source, node))
if not self.newest: # one good source is enough
return True
if 'handoff_index' in node and \
(is_server_error(possible_source.status) or
possible_source.status == HTTP_NOT_FOUND) and \
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
# throw out 5XX and 404s from handoff nodes unless the data is
# really on disk and had been DELETEd
return None
self.status = possible_source.status
self.reason = possible_source.reason
self.source_headers = possible_source.getheaders()
if is_good_source(possible_source.status):
self.body = None
return possible_source
else:
if 'handoff_index' in node and \
(is_server_error(possible_source.status) or
possible_source.status == HTTP_NOT_FOUND) and \
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
# throw out 5XX and 404s from handoff nodes unless the data is
# really on disk and had been DELETEd
return False
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(possible_source.read())
self.source_headers.append(possible_source.getheaders())
self.body = possible_source.read()
# if 404, record the timestamp. If a good source shows up, its
# timestamp will be compared to the latest 404.
# For now checking only on objects, but future work could include
# the same check for account and containers. See lp 1560574.
if self.server_type == 'Object' and \
possible_source.status == HTTP_NOT_FOUND:
hdrs = HeaderKeyDict(possible_source.getheaders())
ts = Timestamp(hdrs.get('X-Backend-Timestamp', 0))
if ts > self.latest_404_timestamp:
self.latest_404_timestamp = ts
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.app.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status):
self.app.error_occurred(
node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') %
'From Object Server') %
{'status': possible_source.status,
'body': self.bodies[-1][:1024],
'type': self.server_type})
return False
'body': self.body[:1024]})
return None
def _get_source_and_node(self):
self.statuses = []
self.reasons = []
self.bodies = []
self.source_headers = []
self.sources = []
@property
def source_and_node_iter(self):
if not hasattr(self, '_source_and_node_iter'):
self._source_and_node_iter = self._source_and_node_gen()
return self._source_and_node_iter
nodes = GreenthreadSafeIterator(self.node_iter)
def _source_and_node_gen(self):
self.status = self.reason = self.body = self.source_headers = None
for node in self.node_iter:
source = self._make_node_request(
node, self.app.recoverable_node_timeout,
self.app.logger.thread_locals)
node_timeout = self.app.node_timeout
if self.server_type == 'Object' and not self.newest:
node_timeout = self.app.recoverable_node_timeout
if source:
self.node = node
yield source, node
else:
yield None, None
self.status = self.reason = self.body = self.source_headers = None
pile = GreenAsyncPile(self.concurrency)
for node in nodes:
pile.spawn(self._make_node_request, node, node_timeout,
self.app.logger.thread_locals)
_timeout = self.app.concurrency_timeout \
if pile.inflight < self.concurrency else None
if pile.waitfirst(_timeout):
break
else:
# ran out of nodes, see if any stragglers will finish
any(pile)
# this helps weed out any sucess status that were found before a 404
# and added to the list in the case of x-newest.
if self.sources:
self.sources = [s for s in self.sources
if source_key(s[0]) >= self.latest_404_timestamp]
if self.sources:
self.sources.sort(key=lambda s: source_key(s[0]))
source, node = self.sources.pop()
for src, _junk in self.sources:
close_swift_conn(src)
self.used_nodes.append(node)
src_headers = dict(
(k.lower(), v) for k, v in
source.getheaders())
# Save off the source etag so that, if we lose the connection
# and have to resume from a different node, we can be sure that
# we have the same object (replication) or a fragment archive
# from the same object (EC). Otherwise, if the cluster has two
# versions of the same object, we might end up switching between
# old and new mid-stream and giving garbage to the client.
self.used_source_etag = normalize_etag(src_headers.get(
'x-object-sysmeta-ec-etag', src_headers.get('etag', '')))
self.node = node
return source, node
def _dig_for_source_and_node(self):
# capture last used etag before continuation
used_etag = self.last_headers.get('X-Object-Sysmeta-EC-ETag')
for source, node in self.source_and_node_iter:
if source and is_good_source(source.status) and \
source.getheader('X-Object-Sysmeta-EC-ETag') == used_etag:
return source, node
return None, None
@ -2805,11 +2788,9 @@ class ECObjectController(BaseObjectController):
backend_headers = self.generate_request_headers(
req, additional=req.headers)
getter = ECFragGetter(self.app, req, 'Object', node_iter,
partition, req.swift_entity_path,
backend_headers,
client_chunk_size=policy.fragment_size,
newest=False, header_provider=header_provider)
getter = ECFragGetter(self.app, req, node_iter, partition,
policy, req.swift_entity_path, backend_headers,
header_provider=header_provider)
return (getter, getter.response_parts_iter(req))
def _convert_range(self, req, policy):
@ -2864,6 +2845,25 @@ class ECObjectController(BaseObjectController):
for s, e in new_ranges)
return range_specs
def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy,
buckets, feeder_q):
while True:
try:
feeder_q.get(timeout=self.app.concurrency_timeout)
except Empty:
if safe_iter.unsafe_iter.primaries_left:
# this will run async, if it ends up taking the last
# primary we won't find out until the next pass
pile.spawn(self._fragment_GET_request,
req, safe_iter, partition,
policy, buckets.get_extra_headers)
else:
# ran out of primaries
break
else:
# got a stop
break
def _get_or_head_response(self, req, node_iter, partition, policy):
update_etag_is_at_header(req, "X-Object-Sysmeta-Ec-Etag")
@ -2887,27 +2887,24 @@ class ECObjectController(BaseObjectController):
safe_iter = GreenthreadSafeIterator(node_iter)
# Sending the request concurrently to all nodes, and responding
# with the first response isn't something useful for EC as all
# nodes contain different fragments. Also EC has implemented it's
# own specific implementation of concurrent gets to ec_ndata nodes.
# So we don't need to worry about plumbing and sending a
# concurrency value to ECFragGetter.
with ContextPool(policy.ec_ndata) as pool:
ec_request_count = policy.ec_ndata + \
self.app.concurrent_ec_extra_requests
with ContextPool(ec_request_count) as pool:
pile = GreenAsyncPile(pool)
buckets = ECGetResponseCollection(policy)
node_iter.set_node_provider(buckets.provide_alternate_node)
# include what may well be an empty X-Backend-Fragment-Preferences
# header from the buckets.get_extra_headers to let the object
# server know that it is ok to return non-durable fragments
for _junk in range(policy.ec_ndata):
for node_count in range(ec_request_count):
pile.spawn(self._fragment_GET_request,
req, safe_iter, partition,
policy, buckets.get_extra_headers)
bad_bucket = ECGetResponseBucket(policy, None)
bad_bucket.set_durable()
best_bucket = None
feeder_q = None
if self.app.concurrent_gets:
feeder_q = Queue()
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
partition, policy, buckets, feeder_q)
extra_requests = 0
# max_extra_requests is an arbitrary hard limit for spawning extra
# getters in case some unforeseen scenario, or a misbehaving object
@ -2917,52 +2914,33 @@ class ECObjectController(BaseObjectController):
# be limit at most 2 * replicas.
max_extra_requests = (
(policy.object_ring.replica_count * 2) - policy.ec_ndata)
for get, parts_iter in pile:
if get.last_status is None:
# We may have spawned getters that find the node iterator
# has been exhausted. Ignore them.
# TODO: turns out that node_iter.nodes_left can bottom
# out at >0 when number of devs in ring is < 2* replicas,
# which definitely happens in tests and results in status
# of None. We should fix that but keep this guard because
# there is also a race between testing nodes_left/spawning
# a getter and an existing getter calling next(node_iter).
continue
try:
if is_success(get.last_status):
# 2xx responses are managed by a response collection
buckets.add_response(get, parts_iter)
else:
# all other responses are lumped into a single bucket
bad_bucket.add_response(get, parts_iter)
buckets.add_response(get, parts_iter)
except ValueError as err:
self.app.logger.error(
_("Problem with fragment response: %s"), err)
shortfall = bad_bucket.shortfall
best_bucket = buckets.best_bucket
if best_bucket:
shortfall = best_bucket.shortfall
if not best_bucket.durable and shortfall <= 0:
# be willing to go a *little* deeper, slowly
shortfall = 1
shortfall = min(shortfall, bad_bucket.shortfall)
if (extra_requests < max_extra_requests and
shortfall > pile._pending and
(node_iter.nodes_left > 0 or
buckets.has_alternate_node())):
# we need more matching responses to reach ec_ndata
# than we have pending gets, as long as we still have
# nodes in node_iter we can spawn another
if best_bucket.durable and best_bucket.shortfall <= 0:
# good enough!
break
requests_available = extra_requests < max_extra_requests and (
node_iter.nodes_left > 0 or buckets.has_alternate_node())
bad_resp = not is_good_source(get.last_status)
if requests_available and (
buckets.shortfall > pile._pending or bad_resp):
extra_requests += 1
pile.spawn(self._fragment_GET_request, req,
safe_iter, partition, policy,
buckets.get_extra_headers)
pile.spawn(self._fragment_GET_request,
req, safe_iter, partition,
policy, buckets.get_extra_headers)
if feeder_q:
feeder_q.put('stop')
# Put this back, since we *may* need it for kickoff()/_fix_response()
# (but note that _fix_ranges() may also pop it back off before then)
req.range = orig_range
if best_bucket and best_bucket.shortfall <= 0 and best_bucket.durable:
best_bucket = buckets.best_bucket
if best_bucket.shortfall <= 0 and best_bucket.durable:
# headers can come from any of the getters
resp_headers = best_bucket.headers
resp_headers.pop('Content-Range', None)
@ -2975,8 +2953,7 @@ class ECObjectController(BaseObjectController):
app_iter = ECAppIter(
req.swift_entity_path,
policy,
[parts_iter for
_getter, parts_iter in best_bucket.get_responses()],
[p_iter for _getter, p_iter in best_bucket.get_responses()],
range_specs, fa_length, obj_length,
self.app.logger)
resp = Response(
@ -3002,25 +2979,28 @@ class ECObjectController(BaseObjectController):
reasons = []
bodies = []
headers = []
for getter, _parts_iter in bad_bucket.get_responses():
if best_bucket and best_bucket.durable:
bad_resp_headers = HeaderKeyDict(getter.last_headers)
t_data_file = bad_resp_headers.get(
'X-Backend-Data-Timestamp')
t_obj = bad_resp_headers.get(
'X-Backend-Timestamp',
bad_resp_headers.get('X-Timestamp'))
bad_ts = Timestamp(t_data_file or t_obj or '0')
if bad_ts <= Timestamp(best_bucket.timestamp_str):
# We have reason to believe there's still good data
# out there, it's just currently unavailable
continue
statuses.extend(getter.statuses)
reasons.extend(getter.reasons)
bodies.extend(getter.bodies)
headers.extend(getter.source_headers)
for status, bad_bucket in buckets.bad_buckets.items():
for getter, _parts_iter in bad_bucket.get_responses():
if best_bucket.durable:
bad_resp_headers = getter.last_headers
t_data_file = bad_resp_headers.get(
'X-Backend-Data-Timestamp')
t_obj = bad_resp_headers.get(
'X-Backend-Timestamp',
bad_resp_headers.get('X-Timestamp'))
bad_ts = Timestamp(t_data_file or t_obj or '0')
if bad_ts <= best_bucket.timestamp:
# We have reason to believe there's still good data
# out there, it's just currently unavailable
continue
if getter.status:
statuses.append(getter.status)
reasons.append(getter.reason)
bodies.append(getter.body)
headers.append(getter.source_headers)
if not statuses and best_bucket and not best_bucket.durable:
if not statuses and is_success(best_bucket.status) and \
not best_bucket.durable:
# pretend that non-durable bucket was 404s
statuses.append(404)
reasons.append('404 Not Found')

View File

@ -263,6 +263,8 @@ class Application(object):
self.concurrent_gets = config_true_value(conf.get('concurrent_gets'))
self.concurrency_timeout = float(conf.get('concurrency_timeout',
self.conn_timeout))
self.concurrent_ec_extra_requests = int(
conf.get('concurrent_ec_extra_requests', 0))
value = conf.get('request_node_count', '2 * replicas').lower().split()
if len(value) == 1:
rnc_value = int(value[0])

View File

@ -21,29 +21,27 @@ import mock
import six
from swift.proxy import server as proxy_server
from swift.proxy.controllers.base import headers_to_container_info, \
headers_to_account_info, headers_to_object_info, get_container_info, \
get_cache_key, get_account_info, get_info, get_object_info, \
Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \
set_info_cache
set_info_cache, NodeIter
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
bytes_to_wsgi
from swift.common import exceptions
from swift.common.utils import split_path, ShardRange, Timestamp
from swift.common.utils import split_path, ShardRange, Timestamp, \
GreenthreadSafeIterator, GreenAsyncPile
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.http import is_success
from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection
from test.unit import (
fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, FakeLogger,
make_timestamp_iter,
mocked_http_conn)
from swift.proxy import server as proxy_server
fake_http_connect, FakeRing, FakeMemcache, PatchPolicies,
make_timestamp_iter, mocked_http_conn, patch_policies, debug_logger)
from swift.common.request_helpers import (
get_sys_meta_prefix, get_object_transient_sysmeta
)
from test.unit import patch_policies
class FakeResponse(object):
@ -179,13 +177,22 @@ class FakeCache(FakeMemcache):
return self.stub or self.store.get(key)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestFuncs(unittest.TestCase):
class BaseTest(unittest.TestCase):
def setUp(self):
self.app = proxy_server.Application(None,
account_ring=FakeRing(),
container_ring=FakeRing(),
logger=FakeLogger())
self.logger = debug_logger()
self.cache = FakeCache()
self.conf = {}
self.account_ring = FakeRing()
self.container_ring = FakeRing()
self.app = proxy_server.Application(self.conf,
logger=self.logger,
account_ring=self.account_ring,
container_ring=self.container_ring)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestFuncs(BaseTest):
def test_get_info_zero_recheck(self):
mock_cache = mock.Mock()
@ -1325,3 +1332,76 @@ class TestFuncs(unittest.TestCase):
self.assertIn('Failed to get container listing', warning_lines[0])
self.assertIn('/a/c', warning_lines[0])
self.assertFalse(warning_lines[1:])
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestNodeIter(BaseTest):
def test_iter_default_fake_ring(self):
for ring in (self.account_ring, self.container_ring):
self.assertEqual(ring.replica_count, 3.0)
node_iter = NodeIter(self.app, ring, 0)
self.assertEqual(6, node_iter.nodes_left)
self.assertEqual(3, node_iter.primaries_left)
count = 0
for node in node_iter:
count += 1
self.assertEqual(count, 3)
self.assertEqual(0, node_iter.primaries_left)
# default fake_ring has NO handoffs, so nodes_left is kind of a lie
self.assertEqual(3, node_iter.nodes_left)
def test_iter_with_handoffs(self):
ring = FakeRing(replicas=3, max_more_nodes=20) # handoffs available
policy = StoragePolicy(0, 'zero', object_ring=ring)
node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy)
self.assertEqual(6, node_iter.nodes_left)
self.assertEqual(3, node_iter.primaries_left)
primary_indexes = set()
handoff_indexes = []
count = 0
for node in node_iter:
if 'index' in node:
primary_indexes.add(node['index'])
else:
handoff_indexes.append(node['handoff_index'])
count += 1
self.assertEqual(count, 6)
self.assertEqual(0, node_iter.primaries_left)
self.assertEqual(0, node_iter.nodes_left)
self.assertEqual({0, 1, 2}, primary_indexes)
self.assertEqual([0, 1, 2], handoff_indexes)
def test_multi_iteration(self):
ring = FakeRing(replicas=8, max_more_nodes=20)
policy = StoragePolicy(0, 'ec', object_ring=ring)
# sanity
node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy)
self.assertEqual(16, len([n for n in node_iter]))
node_iter = NodeIter(self.app, policy.object_ring, 0, policy=policy)
self.assertEqual(16, node_iter.nodes_left)
self.assertEqual(8, node_iter.primaries_left)
pile = GreenAsyncPile(5)
def eat_node(node_iter):
return next(node_iter)
safe_iter = GreenthreadSafeIterator(node_iter)
for i in range(5):
pile.spawn(eat_node, safe_iter)
nodes = []
for node in pile:
nodes.append(node)
primary_indexes = {n['index'] for n in nodes}
self.assertEqual(5, len(primary_indexes))
self.assertEqual(3, node_iter.primaries_left)
# it's problematic we don't decrement nodes_left until we resume
self.assertEqual(12, node_iter.nodes_left)
for node in node_iter:
nodes.append(node)
self.assertEqual(17, len(nodes))

View File

@ -26,7 +26,8 @@ import json
from hashlib import md5
import mock
from eventlet import Timeout
from eventlet import Timeout, sleep
from eventlet.queue import Empty
import six
from six import StringIO
@ -2411,6 +2412,249 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
def test_GET_no_response_error(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect():
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
def test_feed_remaining_primaries(self):
controller = self.controller_cls(
self.app, 'a', 'c', 'o')
safe_iter = utils.GreenthreadSafeIterator(self.app.iter_nodes(
self.policy.object_ring, 0, policy=self.policy))
controller._fragment_GET_request = lambda *a, **k: next(safe_iter)
pile = utils.GreenAsyncPile(self.policy.ec_ndata)
for i in range(self.policy.ec_ndata):
pile.spawn(controller._fragment_GET_request)
req = swob.Request.blank('/v1/a/c/o')
feeder_q = mock.MagicMock()
def feeder_timeout(*a, **kw):
# simulate trampoline
sleep()
# timeout immediately
raise Empty
feeder_q.get.side_effect = feeder_timeout
controller.feed_remaining_primaries(
safe_iter, pile, req, 0, self.policy, mock.MagicMock(), feeder_q)
expected_call = mock.call(timeout=self.app.concurrency_timeout)
expected_num_calls = self.policy.ec_nparity + 1
self.assertEqual(feeder_q.get.call_args_list,
[expected_call] * expected_num_calls)
def test_GET_timeout(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.recoverable_node_timeout = 0.01
codes = [FakeStatus(404, response_sleep=1.0)] + \
[200] * (self.policy.ec_ndata)
with mocked_http_conn(*codes) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(self.policy.ec_ndata + 1, len(log.requests))
def test_GET_with_slow_primaries(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-743]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
status_codes = ([
FakeStatus(200, response_sleep=2.0),
] * self.policy.ec_nparity) + ([
FakeStatus(200),
] * self.policy.ec_ndata)
self.assertEqual(len(status_codes), len(ec_archive_bodies))
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_some_slow_primaries(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-289]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
slow_count = self.policy.ec_nparity
status_codes = ([
FakeStatus(200, response_sleep=2.0),
] * slow_count) + ([
FakeStatus(200),
] * (self.policy.ec_ndata - slow_count))
random.shuffle(status_codes)
status_codes.extend([
FakeStatus(200),
] * slow_count)
self.assertEqual(len(status_codes), len(ec_archive_bodies))
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_slow_nodes_and_failures(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-289]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
unused_resp = [
FakeStatus(200, response_sleep=2.0),
FakeStatus(200, response_sleep=2.0),
500,
416,
]
self.assertEqual(len(unused_resp), self.policy.ec_nparity)
status_codes = (
[200] * (self.policy.ec_ndata - 4)) + unused_resp
self.assertEqual(len(status_codes), self.policy.ec_ndata)
# random.shuffle(status_codes)
# make up for the failures
status_codes.extend([200] * self.policy.ec_nparity)
self.assertEqual(len(status_codes), len(ec_archive_bodies))
bodies_with_errors = []
for code, body in zip(status_codes, ec_archive_bodies):
if code == 500:
bodies_with_errors.append('Kaboom')
elif code == 416:
bodies_with_errors.append('That Range is no.')
else:
bodies_with_errors.append(body)
with mocked_http_conn(*status_codes, body_iter=bodies_with_errors,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_one_slow_frag_lane(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
req = swift.common.swob.Request.blank('/v1/a/c/o')
self.app.concurrent_gets = True
self.app.concurrency_timeout = 0.01
status_codes = [
FakeStatus(200, response_sleep=2.0),
] + ([
FakeStatus(200),
] * (self.policy.ec_ndata - 1))
random.shuffle(status_codes)
status_codes.extend([
FakeStatus(200, response_sleep=2.0),
FakeStatus(200, response_sleep=2.0),
FakeStatus(200, response_sleep=2.0),
FakeStatus(200),
])
self.assertEqual(len(status_codes), len(ec_archive_bodies))
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.ec_n_unique_fragments)
def test_GET_with_concurrent_ec_extra_requests(self):
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
ts = self.ts()
headers = []
for i, body in enumerate(ec_archive_bodies):
headers.append({
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(body),
'X-Object-Sysmeta-Ec-Frag-Index':
self.policy.get_backend_index(i),
'X-Backend-Timestamp': ts.internal,
'X-Timestamp': ts.normal,
'X-Backend-Durable-Timestamp': ts.internal,
'X-Backend-Data-Timestamp': ts.internal,
})
self.app.concurrent_ec_extra_requests = self.policy.ec_nparity - 1
req = swift.common.swob.Request.blank('/v1/a/c/o')
status_codes = [200] * (self.policy.object_ring.replicas - 1)
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(log.requests),
self.policy.object_ring.replicas - 1)
self.assertEqual(resp.body, test_data)
def test_GET_with_body(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
# turn a real body into fragments
@ -2576,7 +2820,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log:
with mock.patch('swift.proxy.server.shuffle', lambda n: n), \
capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
@ -2589,17 +2834,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index']
collected_responses[etag].add(index)
# because the primary nodes are shuffled, it's possible the proxy
# didn't even notice the missed overwrite frag - but it might have
self.assertLessEqual(len(log), self.policy.ec_ndata + 1)
self.assertLessEqual(len(collected_responses), 2)
# ... regardless we should never need to fetch more than ec_ndata
# frags for any given etag
for etag, frags in collected_responses.items():
self.assertLessEqual(len(frags), self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
self.assertEqual(len(log), self.policy.ec_ndata + 1)
expected = {
obj1['etag']: 1,
obj2['etag']: self.policy.ec_ndata,
}
self.assertEqual(expected, {
e: len(f) for e, f in collected_responses.items()})
def test_GET_with_many_missed_overwrite_will_need_handoff(self):
obj1 = self._make_ec_object_stub(pattern='obj1')
@ -2857,7 +3098,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
collected_indexes[fi].append(conn)
self.assertEqual(len(collected_indexes), 7)
def test_GET_with_mixed_nondurable_frags_and_no_quorum_will_503(self):
def test_GET_with_mixed_nondurable_frags_and_will_404(self):
# all nodes have a frag but there is no one set that reaches quorum,
# which means there is no backend 404 response, but proxy should still
# return 404 rather than 503
@ -2919,10 +3160,72 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
collected_etags)
self.assertEqual({200}, collected_status)
def test_GET_with_mixed_frags_and_no_quorum_will_503(self):
def test_GET_with_mixed_durable_and_nondurable_frags_will_503(self):
# all nodes have a frag but there is no one set that reaches quorum,
# but since they're all marked durable (so we *should* be able to
# reconstruct), proxy will 503
# but since one is marked durable we *should* be able to reconstruct,
# so proxy should 503
obj1 = self._make_ec_object_stub(pattern='obj1')
obj2 = self._make_ec_object_stub(pattern='obj2')
obj3 = self._make_ec_object_stub(pattern='obj3')
obj4 = self._make_ec_object_stub(pattern='obj4')
node_frags = [
{'obj': obj1, 'frag': 0, 'durable': False},
{'obj': obj2, 'frag': 0, 'durable': False},
{'obj': obj3, 'frag': 0, 'durable': False},
{'obj': obj1, 'frag': 1, 'durable': False},
{'obj': obj2, 'frag': 1, 'durable': False},
{'obj': obj3, 'frag': 1, 'durable': False},
{'obj': obj1, 'frag': 2, 'durable': False},
{'obj': obj2, 'frag': 2, 'durable': False},
{'obj': obj3, 'frag': 2, 'durable': False},
{'obj': obj1, 'frag': 3, 'durable': False},
{'obj': obj2, 'frag': 3, 'durable': False},
{'obj': obj3, 'frag': 3, 'durable': False},
{'obj': obj1, 'frag': 4, 'durable': False},
{'obj': obj2, 'frag': 4, 'durable': False},
{'obj': obj3, 'frag': 4, 'durable': False},
{'obj': obj1, 'frag': 5, 'durable': False},
{'obj': obj2, 'frag': 5, 'durable': False},
{'obj': obj3, 'frag': 5, 'durable': False},
{'obj': obj1, 'frag': 6, 'durable': False},
{'obj': obj2, 'frag': 6, 'durable': False},
{'obj': obj3, 'frag': 6, 'durable': False},
{'obj': obj1, 'frag': 7, 'durable': False},
{'obj': obj2, 'frag': 7, 'durable': False},
{'obj': obj3, 'frag': 7},
{'obj': obj1, 'frag': 8, 'durable': False},
{'obj': obj2, 'frag': 8, 'durable': False},
{'obj': obj3, 'frag': 8, 'durable': False},
{'obj': obj4, 'frag': 8, 'durable': False},
]
fake_response = self._fake_ec_node_response(node_frags)
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
collected_etags = set()
collected_status = set()
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
collected_etags.add(etag)
collected_status.add(conn.resp.status)
# default node_iter will exhaust at 2 * replicas
self.assertEqual(len(log), 2 * self.replicas())
self.assertEqual(
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
collected_etags)
self.assertEqual({200}, collected_status)
def test_GET_with_mixed_durable_frags_and_no_quorum_will_503(self):
# all nodes have a frag but there is no one set that reaches quorum,
# and since at least one is marked durable we *should* be able to
# reconstruct, so proxy will 503
obj1 = self._make_ec_object_stub(pattern='obj1')
obj2 = self._make_ec_object_stub(pattern='obj2')
obj3 = self._make_ec_object_stub(pattern='obj3')
@ -3001,7 +3304,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
{'obj': obj1, 'frag': 11, 'durable': False}, # parity
{'obj': obj1, 'frag': 12, 'durable': False}, # parity
{'obj': obj1, 'frag': 13, 'durable': False}, # parity
] # handoffs not used in this scenario
] + [[]] * self.replicas() # handoffs all 404
fake_response = self._fake_ec_node_response(list(node_frags))
@ -3013,9 +3316,11 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.headers['etag'], obj1['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj1['etag'])
self.assertEqual(self.policy.ec_ndata, len(log))
self.assertGreaterEqual(len(log), self.policy.ec_ndata)
collected_durables = []
for conn in log:
if not conn.resp.headers.get('X-Backend-Data-Timestamp'):
continue
if (conn.resp.headers.get('X-Backend-Durable-Timestamp')
== conn.resp.headers.get('X-Backend-Data-Timestamp')):
collected_durables.append(conn)
@ -3044,7 +3349,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
{'obj': obj1, 'frag': 11, 'durable': False}, # parity
{'obj': obj1, 'frag': 12, 'durable': False}, # parity
{'obj': obj1, 'frag': 13, 'durable': False}, # parity
] # handoffs not used in this scenario
] + [[]] * self.replicas() # handoffs all 404
fake_response = self._fake_ec_node_response(list(node_frags))
@ -3058,6 +3363,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
collected_durables = []
for conn in log:
if not conn.resp.headers.get('X-Backend-Data-Timestamp'):
continue
if (conn.resp.headers.get('X-Backend-Durable-Timestamp')
== conn.resp.headers.get('X-Backend-Data-Timestamp')):
collected_durables.append(conn)
@ -3231,6 +3538,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
# min: proxy will GET 10 non-durable obj1 frags and then 10 obj frags
self.assertGreaterEqual(len(log), 2 * self.policy.ec_ndata)
def test_GET_with_missing_durables_and_older_obscured_durables(self):
# scenario: obj3 has 14 frags but only 2 are durable and these are
# obscured by two non-durable frags of obj1. There is also a single
# non-durable frag of obj2. The proxy will need to do at least 10
@ -3259,7 +3567,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
[{'obj': obj3, 'frag': 11, 'durable': False}],
[{'obj': obj3, 'frag': 12, 'durable': False}],
[{'obj': obj3, 'frag': 13, 'durable': False}],
]
] + [[]] * self.replicas() # handoffs 404
fake_response = self._fake_ec_node_response(list(node_frags))
@ -3271,7 +3579,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.headers['etag'], obj3['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj3['etag'])
self.assertGreaterEqual(len(log), self.policy.ec_ndata + 1)
self.assertLessEqual(len(log), self.policy.ec_ndata + 4)
self.assertLessEqual(len(log), (self.policy.ec_ndata * 2) + 1)
def test_GET_with_missing_durables_and_older_non_durables(self):
# scenario: non-durable frags of newer obj1 obscure all frags
@ -3453,7 +3761,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
StubResponse(416, frag_index=4),
StubResponse(416, frag_index=5),
StubResponse(416, frag_index=6),
# sneak in bogus extra responses
# sneak a couple bogus extra responses
StubResponse(404),
StubResponse(206, frag_index=8),
# and then just "enough" more 416's
@ -3471,8 +3779,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 416)
# ec_ndata responses that must agree, plus the bogus extras
self.assertEqual(len(log), self.policy.ec_ndata + 2)
# we're going to engage ndata primaries, plus the bogus extra
# self.assertEqual(len(log), self.policy.ec_ndata + 2)
self.assertEqual([c.resp.status for c in log],
([416] * 7) + [404, 206] + ([416] * 3))
def test_GET_with_missing_and_range_unsatisifiable(self):
responses = [ # not quite ec_ndata frags on primaries
@ -3700,7 +4010,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
status_codes, body_iter, headers = zip(*responses + [
(404, [b''], {}) for i in range(
self.policy.object_ring.max_more_nodes)])
with set_http_connect(*status_codes, body_iter=body_iter,
with mocked_http_conn(*status_codes, body_iter=body_iter,
headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
@ -3708,8 +4018,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
# resume but won't be able to give us all the right bytes
self.assertNotEqual(md5(resp.body).hexdigest(), etag)
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(self.replicas(), len(error_lines))
nparity = self.policy.ec_nparity
self.assertGreater(len(error_lines), nparity)
for line in error_lines[:nparity]:
self.assertIn('retrying', line)
for line in error_lines[nparity:]:
@ -3720,7 +4030,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
test_data = (b'test' * segment_size)[:-333]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
headers = {'X-Object-Sysmeta-Ec-Etag': etag}
headers = {
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(test_data),
}
self.app.recoverable_node_timeout = 0.05
# first one is slow
responses = [(200, SlowBody(ec_archive_bodies[0], 0.1),
@ -3737,11 +4050,100 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertTrue(md5(resp.body).hexdigest(), etag)
self.assertEqual(md5(resp.body).hexdigest(), etag)
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertIn('retrying', error_lines[0])
def test_GET_read_timeout_resume_mixed_etag(self):
segment_size = self.policy.ec_segment_size
test_data2 = (b'blah1' * segment_size)[:-333]
test_data1 = (b'test' * segment_size)[:-333]
etag2 = md5(test_data2).hexdigest()
etag1 = md5(test_data1).hexdigest()
ec_archive_bodies2 = self._make_ec_archive_bodies(test_data2)
ec_archive_bodies1 = self._make_ec_archive_bodies(test_data1)
headers2 = {'X-Object-Sysmeta-Ec-Etag': etag2,
'X-Object-Sysmeta-Ec-Content-Length': len(test_data2),
'X-Backend-Timestamp': self.ts().internal}
headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1,
'X-Object-Sysmeta-Ec-Content-Length': len(test_data1),
'X-Backend-Timestamp': self.ts().internal}
responses = [
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[1], self._add_frag_index(1, headers1)),
# 404
(404, [b''], {}),
# etag1
(200, SlowBody(ec_archive_bodies1[3], 0.1), self._add_frag_index(
3, headers1)),
# etag2
(200, ec_archive_bodies2[4], self._add_frag_index(4, headers2)),
# etag1
(200, ec_archive_bodies1[5], self._add_frag_index(5, headers1)),
# etag2
(200, ec_archive_bodies2[6], self._add_frag_index(6, headers2)),
# etag1
(200, ec_archive_bodies1[7], self._add_frag_index(7, headers1)),
# etag2
(200, ec_archive_bodies2[8], self._add_frag_index(8, headers2)),
# etag1
(200, SlowBody(ec_archive_bodies1[9], 0.1), self._add_frag_index(
9, headers1)),
# etag2
(200, ec_archive_bodies2[10], self._add_frag_index(10, headers2)),
# etag1
(200, ec_archive_bodies1[11], self._add_frag_index(11, headers1)),
# etag2
(200, ec_archive_bodies2[12], self._add_frag_index(12, headers2)),
# 404
(404, [b''], {}),
# handoffs start here
# etag2
(200, ec_archive_bodies2[0], self._add_frag_index(0, headers2)),
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[2], self._add_frag_index(2, headers1)),
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[4], self._add_frag_index(4, headers1)),
# etag2
(200, ec_archive_bodies2[1], self._add_frag_index(1, headers2)),
# etag1
(200, ec_archive_bodies1[6], self._add_frag_index(6, headers1)),
# etag2
(200, ec_archive_bodies2[7], self._add_frag_index(7, headers2)),
# etag1
(200, ec_archive_bodies1[8], self._add_frag_index(8, headers1)),
# resume requests start here
# 404
(404, [b''], {}),
# etag2
(200, ec_archive_bodies2[3], self._add_frag_index(3, headers2)),
# 404
(404, [b''], {}),
# etag1
(200, ec_archive_bodies1[10], self._add_frag_index(10, headers1)),
# etag1
(200, ec_archive_bodies1[12], self._add_frag_index(12, headers1)),
]
self.app.recoverable_node_timeout = 0.01
req = swob.Request.blank('/v1/a/c/o')
status_codes, body_iter, headers = zip(*responses)
with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(md5(resp.body).hexdigest(), etag1)
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(2, len(error_lines))
for line in error_lines:
self.assertIn('retrying', line)
def test_fix_response_HEAD(self):
headers = {'X-Object-Sysmeta-Ec-Content-Length': '10',
'X-Object-Sysmeta-Ec-Etag': 'foo'}
@ -3825,6 +4227,43 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.etag, body_etag)
self.assertEqual(resp.headers['Accept-Ranges'], 'bytes')
def test_non_durable_ec_response_bucket(self):
ts = self.ts()
bucket = obj.ECGetResponseBucket(self.policy, ts)
self.assertEqual(bucket.shortfall, self.policy.ec_ndata)
for i in range(1, self.policy.ec_ndata - self.policy.ec_nparity + 1):
stub_getter = mock.MagicMock(last_status=200, last_headers={
'X-Backend-Timestamp': ts.internal,
'X-Object-Sysmeta-Ec-Etag': 'the-etag',
'X-Object-Sysmeta-Ec-Frag-Index': str(i),
})
bucket.add_response(stub_getter, None)
self.assertEqual(bucket.shortfall, self.policy.ec_ndata - i)
self.assertEqual(bucket.shortfall, self.policy.ec_nparity)
self.assertFalse(bucket.durable)
expectations = (
4, # 7
4, # 8
4, # 9
4, # 10
3, # 11
2, # 12
1, # 13
1, # 14
)
for i, expected in zip(range(
self.policy.ec_ndata - self.policy.ec_nparity + 1,
self.policy.object_ring.replica_count + 1), expectations):
stub_getter = mock.MagicMock(last_status=200, last_headers={
'X-Backend-Timestamp': ts.internal,
'X-Object-Sysmeta-Ec-Etag': 'the-etag',
'X-Object-Sysmeta-Ec-Frag-Index': str(i),
})
bucket.add_response(stub_getter, None)
msg = 'With %r resp, expected shortfall %s != %s' % (
bucket.gets.keys(), expected, bucket.shortfall)
self.assertEqual(bucket.shortfall, expected, msg)
class TestECFunctions(unittest.TestCase):
def test_chunk_transformer(self):
@ -3969,7 +4408,8 @@ class TestECDuplicationObjController(
# the backend requests will stop at enough ec_ndata responses
self.assertEqual(
len(frags), self.policy.ec_ndata,
'collected %s frags for etag %s' % (len(frags), etag))
'collected %s frags (expected %s) for etag %s' % (
len(frags), self.policy.ec_ndata, etag))
# TODO: actually "frags" in node_frags is meaning "node_index" right now
# in following tests. Reconsidering the name and semantics change needed.

View File

@ -1337,6 +1337,20 @@ class TestProxyServerLoading(unittest.TestCase):
self.assertEqual(app.post_quorum_timeout, 0.3)
self.assertEqual(app.concurrency_timeout, 0.2)
def test_concurrent_ec_options(self):
conf = {
'concurrent_gets': 'on',
'concurrency_timeout': '0.5',
'concurrent_ec_extra_requests': '4',
}
for policy in POLICIES:
policy.object_ring = FakeRing()
app = proxy_server.Application(conf, debug_logger(),
FakeRing(), FakeRing())
self.assertEqual(app.concurrent_ec_extra_requests, 4)
self.assertEqual(app.concurrent_gets, True)
self.assertEqual(app.concurrency_timeout, 0.5)
def test_load_policy_rings(self):
for policy in POLICIES:
self.assertFalse(policy.object_ring)
@ -4687,12 +4701,21 @@ class TestReplicatedObjectController(
object_ring.max_more_nodes = 0
def test_iter_nodes_calls_sort_nodes(self):
with mock.patch.object(self.app, 'sort_nodes') as sort_nodes:
called = []
def fake_sort_nodes(nodes, **kwargs):
# caller might mutate the list we return during iteration, we're
# interested in the value as of call time
called.append(mock.call(list(nodes), **kwargs))
return nodes
with mock.patch.object(self.app, 'sort_nodes',
side_effect=fake_sort_nodes):
object_ring = self.app.get_object_ring(None)
for node in self.app.iter_nodes(object_ring, 0):
pass
sort_nodes.assert_called_once_with(
object_ring.get_part_nodes(0), policy=None)
self.assertEqual(called, [
mock.call(object_ring.get_part_nodes(0), policy=None)
])
def test_iter_nodes_skips_error_limited(self):
with mock.patch.object(self.app, 'sort_nodes',