Enable object server to return non-durable data
This patch improves EC GET response handling: - The proxy no longer requires all object servers to have a durable file for the fragment archive that they return in response to a GET. The proxy will now be satisfied if just one object server has a durable file at the same timestamp as fragments from other object servers. This means that the proxy can now successfully GET an object that had missing durable files when it was PUT. - The proxy will now ensure that it has a quorum of *unique* fragment indexes from object servers before considering a GET to be successful. - The proxy is now able to fetch multiple fragment archives having different indexes from the same node. This enables the proxy to successfully GET an object that has some fragments that have landed on the same node, for example after a rebalance. This new behavior is facilitated by an exchange of new headers on a GET request and response between the proxy and object servers. An object server now includes with a GET (or HEAD) response: - X-Backend-Fragments: the value of this describes all fragment archive indexes that the server has for the object by encoding a map of the form: timestamp -> <list of fragment indexes> - X-Backend-Durable-Timestamp: the value of this is the internal form of the timestamp of the newest durable file that was found, if any. - X-Backend-Data-Timestamp: the value of this is the internal form of the timestamp of the data file that was used to construct the diskfile. A proxy server now includes with a GET request: - X-Backend-Fragment-Preferences: the value of this describes the proxy's current preference with respect to those fragments that it would have object servers return. It encodes a list of timestamp, and for each timestamp a list of fragment indexes that the proxy does NOT require (because it already has them). The presence of a X-Backend-Fragment-Preferences header (even one with an empty list as its value) will cause the object server to search for the most appropriate fragment to return, disregarding the existence or not of any durable file. The object server assumes that the proxy knows best. Closes-Bug: 1469094 Closes-Bug: 1484598 Change-Id: I2310981fd1c4622ff5d1a739cbcc59637ffe3fc3 Co-Authored-By: Paul Luse <paul.e.luse@intel.com> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
This commit is contained in:
parent
f414c154e0
commit
44a861787a
@ -461,13 +461,9 @@ A few key points on the .durable file:
|
||||
|
||||
* The .durable file means \"the matching .data file for this has sufficient
|
||||
fragment archives somewhere, committed, to reconstruct the object\".
|
||||
* The Proxy Server will never have knowledge, either on GET or HEAD, of the
|
||||
existence of a .data file on an object server if it does not have a matching
|
||||
.durable file.
|
||||
* The object server will never return a .data that does not have a matching
|
||||
.durable.
|
||||
* When a proxy does a GET, it will only receive fragment archives that have
|
||||
enough present somewhere to be reconstructed.
|
||||
* When a proxy does a GET, it will require at least one object server to
|
||||
respond with a fragment archive that has a matching `.durable` file before
|
||||
reconstructing and returning the object to the client.
|
||||
|
||||
Partial PUT Failures
|
||||
====================
|
||||
@ -500,17 +496,39 @@ The GET for EC is different enough from replication that subclassing the
|
||||
`BaseObjectController` to the `ECObjectController` enables an efficient way to
|
||||
implement the high level steps described earlier:
|
||||
|
||||
#. The proxy server makes simultaneous requests to participating nodes.
|
||||
#. As soon as the proxy has the fragments it needs, it calls on PyECLib to
|
||||
decode the data.
|
||||
#. The proxy server makes simultaneous requests to `ec_ndata` primary object
|
||||
server nodes with goal of finding a set of `ec_ndata` distinct EC archives
|
||||
at the same timestamp, and an indication from at least one object server
|
||||
that a `<timestamp>.durable` file exists for that timestamp. If this goal is
|
||||
not achieved with the first `ec_ndata` requests then the proxy server
|
||||
continues to issue requests to the remaining primary nodes and then handoff
|
||||
nodes.
|
||||
#. As soon as the proxy server has found a usable set of `ec_ndata` EC
|
||||
archives, it starts to call PyECLib to decode fragments as they are returned
|
||||
by the object server nodes.
|
||||
#. The proxy server creates Etag and content length headers for the client
|
||||
response since each EC archive's metadata is valid only for that archive.
|
||||
#. The proxy streams the decoded data it has back to the client.
|
||||
#. Repeat until the proxy is done sending data back to the client.
|
||||
|
||||
The GET path will attempt to contact all nodes participating in the EC scheme,
|
||||
if not enough primaries respond then handoffs will be contacted just as with
|
||||
replication. Etag and content length headers are updated for the client
|
||||
response following reconstruction as the individual fragment archives metadata
|
||||
is valid only for that fragment archive.
|
||||
Note that the proxy does not require all objects servers to have a `.durable`
|
||||
file for the EC archive that they return in response to a GET. The proxy
|
||||
will be satisfied if just one object server has a `.durable` file at the same
|
||||
timestamp as EC archives returned from other object servers. This means
|
||||
that the proxy can successfully GET an object that had missing `.durable` files
|
||||
when it was PUT (i.e. a partial PUT failure occurred).
|
||||
|
||||
Note also that an object server may inform the proxy server that it has more
|
||||
than one EC archive for different timestamps and/or fragment indexes, which may
|
||||
cause the proxy server to issue multiple requests for distinct EC archives to
|
||||
that object server. (This situation can temporarily occur after a ring
|
||||
rebalance when a handoff node storing an archive has become a primary node and
|
||||
received its primary archive but not yet moved the handoff archive to its
|
||||
primary node.)
|
||||
|
||||
The proxy may receive EC archives having different timestamps, and may
|
||||
receive several EC archives having the same index. The proxy therefore
|
||||
ensures that it has sufficient EC archives with the same timestamp
|
||||
and distinct fragment indexes before considering a GET to be successful.
|
||||
|
||||
Object Server
|
||||
-------------
|
||||
|
@ -4089,3 +4089,12 @@ def o_tmpfile_supported():
|
||||
return all([linkat.available,
|
||||
platform.system() == 'Linux',
|
||||
LooseVersion(platform.release()) >= LooseVersion('3.16')])
|
||||
|
||||
|
||||
def safe_json_loads(value):
|
||||
if value:
|
||||
try:
|
||||
return json.loads(value)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
return None
|
||||
|
@ -827,15 +827,20 @@ class BaseDiskFileManager(object):
|
||||
self._process_ondisk_files(exts, results, **kwargs)
|
||||
|
||||
# set final choice of files
|
||||
if exts.get('.ts'):
|
||||
if 'data_info' in results:
|
||||
if exts.get('.meta'):
|
||||
# only report a meta file if a data file has been chosen
|
||||
results['meta_info'] = exts['.meta'][0]
|
||||
ctype_info = exts['.meta'].pop()
|
||||
if (ctype_info['ctype_timestamp']
|
||||
> results['data_info']['timestamp']):
|
||||
results['ctype_info'] = ctype_info
|
||||
elif exts.get('.ts'):
|
||||
# only report a ts file if a data file has not been chosen
|
||||
# (ts files will commonly already have been removed from exts if
|
||||
# a data file was chosen, but that may not be the case if
|
||||
# non-durable EC fragment(s) were chosen, hence the elif here)
|
||||
results['ts_info'] = exts['.ts'][0]
|
||||
if 'data_info' in results and exts.get('.meta'):
|
||||
# only report a meta file if a data file has been chosen
|
||||
results['meta_info'] = exts['.meta'][0]
|
||||
ctype_info = exts['.meta'].pop()
|
||||
if (ctype_info['ctype_timestamp']
|
||||
> results['data_info']['timestamp']):
|
||||
results['ctype_info'] = ctype_info
|
||||
|
||||
# set ts_file, data_file, meta_file and ctype_file with path to
|
||||
# chosen file or None
|
||||
@ -2635,6 +2640,41 @@ class ECDiskFile(BaseDiskFile):
|
||||
self._frag_index = None
|
||||
if frag_index is not None:
|
||||
self._frag_index = self.manager.validate_fragment_index(frag_index)
|
||||
self._frag_prefs = self._validate_frag_prefs(kwargs.get('frag_prefs'))
|
||||
self._durable_frag_set = None
|
||||
|
||||
def _validate_frag_prefs(self, frag_prefs):
|
||||
"""
|
||||
Validate that frag_prefs is a list of dicts containing expected keys
|
||||
'timestamp' and 'exclude'. Convert timestamp values to Timestamp
|
||||
instances and validate that exclude values are valid fragment indexes.
|
||||
|
||||
:param frag_prefs: data to validate, should be a list of dicts.
|
||||
:raise DiskFileError: if the frag_prefs data is invalid.
|
||||
:return: a list of dicts with converted and validated values.
|
||||
"""
|
||||
# We *do* want to preserve an empty frag_prefs list because it
|
||||
# indicates that a durable file is not required.
|
||||
if frag_prefs is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return [
|
||||
{'timestamp': Timestamp(pref['timestamp']),
|
||||
'exclude': [self.manager.validate_fragment_index(fi)
|
||||
for fi in pref['exclude']]}
|
||||
for pref in frag_prefs]
|
||||
except ValueError as e:
|
||||
raise DiskFileError(
|
||||
'Bad timestamp in frag_prefs: %r: %s'
|
||||
% (frag_prefs, e))
|
||||
except DiskFileError as e:
|
||||
raise DiskFileError(
|
||||
'Bad fragment index in frag_prefs: %r: %s'
|
||||
% (frag_prefs, e))
|
||||
except (KeyError, TypeError) as e:
|
||||
raise DiskFileError(
|
||||
'Bad frag_prefs: %r: %s' % (frag_prefs, e))
|
||||
|
||||
@property
|
||||
def durable_timestamp(self):
|
||||
@ -2671,13 +2711,14 @@ class ECDiskFile(BaseDiskFile):
|
||||
def _get_ondisk_files(self, files):
|
||||
"""
|
||||
The only difference between this method and the replication policy
|
||||
DiskFile method is passing in the frag_index kwarg to our manager's
|
||||
get_ondisk_files method.
|
||||
DiskFile method is passing in the frag_index and frag_prefs kwargs to
|
||||
our manager's get_ondisk_files method.
|
||||
|
||||
:param files: list of file names
|
||||
"""
|
||||
self._ondisk_info = self.manager.get_ondisk_files(
|
||||
files, self._datadir, frag_index=self._frag_index)
|
||||
files, self._datadir, frag_index=self._frag_index,
|
||||
frag_prefs=self._frag_prefs)
|
||||
return self._ondisk_info
|
||||
|
||||
def purge(self, timestamp, frag_index):
|
||||
@ -2804,14 +2845,49 @@ class ECDiskFileManager(BaseDiskFileManager):
|
||||
rv['frag_index'] = None
|
||||
return rv
|
||||
|
||||
def _process_ondisk_files(self, exts, results, frag_index=None, **kwargs):
|
||||
def _process_ondisk_files(self, exts, results, frag_index=None,
|
||||
frag_prefs=None, **kwargs):
|
||||
"""
|
||||
Implement EC policy specific handling of .data and .durable files.
|
||||
|
||||
If a frag_prefs keyword arg is provided then its value may determine
|
||||
which fragment index at which timestamp is used to construct the
|
||||
diskfile. The value of frag_prefs should be a list. Each item in the
|
||||
frag_prefs list should be a dict that describes per-timestamp
|
||||
preferences using the following items:
|
||||
|
||||
* timestamp: An instance of :class:`~swift.common.utils.Timestamp`.
|
||||
* exclude: A list of valid fragment indexes (i.e. whole numbers)
|
||||
that should be EXCLUDED when choosing a fragment at the
|
||||
timestamp. This list may be empty.
|
||||
|
||||
For example::
|
||||
|
||||
[
|
||||
{'timestamp': <Timestamp instance>, 'exclude': [1,3]},
|
||||
{'timestamp': <Timestamp instance>, 'exclude': []}
|
||||
]
|
||||
|
||||
The order of per-timestamp dicts in the frag_prefs list is significant
|
||||
and indicates descending preference for fragments from each timestamp
|
||||
i.e. a fragment that satisfies the first per-timestamp preference in
|
||||
the frag_prefs will be preferred over a fragment that satisfies a
|
||||
subsequent per-timestamp preferred, and so on.
|
||||
|
||||
If a timestamp is not cited in any per-timestamp preference dict then
|
||||
it is assumed that any fragment index at that timestamp may be used to
|
||||
construct the diskfile.
|
||||
|
||||
When a frag_prefs arg is provided, including an empty list, there is no
|
||||
requirement for there to be a durable file at the same timestamp as a
|
||||
data file that is chosen to construct the disk file
|
||||
|
||||
:param exts: dict of lists of file info, keyed by extension
|
||||
:param results: a dict that may be updated with results
|
||||
:param frag_index: if set, search for a specific fragment index .data
|
||||
file, otherwise accept the first valid .data file.
|
||||
:param frag_prefs: if set, search for any fragment index .data file
|
||||
that satisfies the frag_prefs.
|
||||
"""
|
||||
durable_info = None
|
||||
if exts.get('.durable'):
|
||||
@ -2841,23 +2917,66 @@ class ECDiskFileManager(BaseDiskFileManager):
|
||||
if durable_info and durable_info['timestamp'] == timestamp:
|
||||
durable_frag_set = frag_set
|
||||
|
||||
# Choose which frag set to use
|
||||
chosen_frag_set = None
|
||||
if frag_prefs is not None:
|
||||
candidate_frag_sets = dict(frag_sets)
|
||||
# For each per-timestamp frag preference dict, do we have any frag
|
||||
# indexes at that timestamp that are not in the exclusion list for
|
||||
# that timestamp? If so choose the highest of those frag_indexes.
|
||||
for ts, exclude_indexes in [
|
||||
(ts_pref['timestamp'], ts_pref['exclude'])
|
||||
for ts_pref in frag_prefs
|
||||
if ts_pref['timestamp'] in candidate_frag_sets]:
|
||||
available_indexes = [info['frag_index']
|
||||
for info in candidate_frag_sets[ts]]
|
||||
acceptable_indexes = list(set(available_indexes) -
|
||||
set(exclude_indexes))
|
||||
if acceptable_indexes:
|
||||
chosen_frag_set = candidate_frag_sets[ts]
|
||||
# override any frag_index passed in as method param with
|
||||
# the last (highest) acceptable_index
|
||||
frag_index = acceptable_indexes[-1]
|
||||
break
|
||||
else:
|
||||
# this frag_set has no acceptable frag index so
|
||||
# remove it from the candidate frag_sets
|
||||
candidate_frag_sets.pop(ts)
|
||||
else:
|
||||
# No acceptable frag index was found at any timestamp mentioned
|
||||
# in the frag_prefs. Choose the newest remaining candidate
|
||||
# frag_set - the proxy can decide if it wants the returned
|
||||
# fragment with that time.
|
||||
if candidate_frag_sets:
|
||||
ts_newest = sorted(candidate_frag_sets.keys())[-1]
|
||||
chosen_frag_set = candidate_frag_sets[ts_newest]
|
||||
else:
|
||||
chosen_frag_set = durable_frag_set
|
||||
|
||||
# Select a single chosen frag from the chosen frag_set, by either
|
||||
# matching against a specified frag_index or taking the highest index.
|
||||
chosen_frag = None
|
||||
if durable_frag_set:
|
||||
if chosen_frag_set:
|
||||
if frag_index is not None:
|
||||
# search the frag set to find the exact frag_index
|
||||
for info in durable_frag_set:
|
||||
for info in chosen_frag_set:
|
||||
if info['frag_index'] == frag_index:
|
||||
chosen_frag = info
|
||||
break
|
||||
else:
|
||||
chosen_frag = durable_frag_set[-1]
|
||||
chosen_frag = chosen_frag_set[-1]
|
||||
|
||||
# If we successfully found a frag then set results
|
||||
if chosen_frag:
|
||||
results['data_info'] = chosen_frag
|
||||
results['durable_frag_set'] = durable_frag_set
|
||||
results['chosen_frag_set'] = chosen_frag_set
|
||||
if chosen_frag_set != durable_frag_set:
|
||||
# hide meta files older than data file but newer than durable
|
||||
# file so they don't get marked as obsolete (we already threw
|
||||
# out .meta's that are older than a .durable)
|
||||
exts['.meta'], _older = self._split_gt_timestamp(
|
||||
exts['.meta'], chosen_frag['timestamp'])
|
||||
results['frag_sets'] = frag_sets
|
||||
|
||||
# Mark any isolated .durable as obsolete
|
||||
@ -2867,7 +2986,7 @@ class ECDiskFileManager(BaseDiskFileManager):
|
||||
|
||||
# Fragments *may* be ready for reclaim, unless they are durable
|
||||
for frag_set in frag_sets.values():
|
||||
if frag_set == durable_frag_set:
|
||||
if frag_set in (durable_frag_set, chosen_frag_set):
|
||||
continue
|
||||
results.setdefault('possible_reclaim', []).extend(frag_set)
|
||||
|
||||
@ -2876,19 +2995,24 @@ class ECDiskFileManager(BaseDiskFileManager):
|
||||
results.setdefault('possible_reclaim', []).extend(
|
||||
exts.get('.meta'))
|
||||
|
||||
def _verify_ondisk_files(self, results, frag_index=None, **kwargs):
|
||||
def _verify_ondisk_files(self, results, frag_index=None,
|
||||
frag_prefs=None, **kwargs):
|
||||
"""
|
||||
Verify that the final combination of on disk files complies with the
|
||||
erasure-coded diskfile contract.
|
||||
|
||||
:param results: files that have been found and accepted
|
||||
:param frag_index: specifies a specific fragment index .data file
|
||||
:param frag_prefs: if set, indicates that fragment preferences have
|
||||
been specified and therefore that a selected fragment is not
|
||||
required to be durable.
|
||||
:returns: True if the file combination is compliant, False otherwise
|
||||
"""
|
||||
if super(ECDiskFileManager, self)._verify_ondisk_files(
|
||||
results, **kwargs):
|
||||
have_data_file = results['data_file'] is not None
|
||||
have_durable = results.get('durable_frag_set') is not None
|
||||
have_durable = (results.get('durable_frag_set') is not None or
|
||||
(have_data_file and frag_prefs is not None))
|
||||
return have_data_file == have_durable
|
||||
return False
|
||||
|
||||
|
@ -33,7 +33,7 @@ from swift.common.utils import public, get_logger, \
|
||||
config_true_value, timing_stats, replication, \
|
||||
normalize_delete_at_timestamp, get_log_line, Timestamp, \
|
||||
get_expirer_container, parse_mime_headers, \
|
||||
iter_multipart_mime_documents, extract_swift_bytes
|
||||
iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_object_creation, \
|
||||
valid_timestamp, check_utf8
|
||||
@ -84,6 +84,15 @@ def drain(file_like, read_size, timeout):
|
||||
break
|
||||
|
||||
|
||||
def _make_backend_fragments_header(fragments):
|
||||
if fragments:
|
||||
result = {}
|
||||
for ts, frag_list in fragments.items():
|
||||
result[ts.internal] = frag_list
|
||||
return json.dumps(result)
|
||||
return None
|
||||
|
||||
|
||||
class EventletPlungerString(str):
|
||||
"""
|
||||
Eventlet won't send headers until it's accumulated at least
|
||||
@ -853,10 +862,12 @@ class ObjectController(BaseStorageServer):
|
||||
"""Handle HTTP GET requests for the Swift Object Server."""
|
||||
device, partition, account, container, obj, policy = \
|
||||
get_name_and_placement(request, 5, 5, True)
|
||||
frag_prefs = safe_json_loads(
|
||||
request.headers.get('X-Backend-Fragment-Preferences'))
|
||||
try:
|
||||
disk_file = self.get_diskfile(
|
||||
device, partition, account, container, obj,
|
||||
policy=policy)
|
||||
policy=policy, frag_prefs=frag_prefs)
|
||||
except DiskFileDeviceUnavailable:
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
try:
|
||||
@ -889,6 +900,13 @@ class ObjectController(BaseStorageServer):
|
||||
pass
|
||||
response.headers['X-Timestamp'] = file_x_ts.normal
|
||||
response.headers['X-Backend-Timestamp'] = file_x_ts.internal
|
||||
response.headers['X-Backend-Data-Timestamp'] = \
|
||||
disk_file.data_timestamp.internal
|
||||
if disk_file.durable_timestamp:
|
||||
response.headers['X-Backend-Durable-Timestamp'] = \
|
||||
disk_file.durable_timestamp.internal
|
||||
response.headers['X-Backend-Fragments'] = \
|
||||
_make_backend_fragments_header(disk_file.fragments)
|
||||
resp = request.get_response(response)
|
||||
except DiskFileXattrNotSupported:
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
@ -906,10 +924,12 @@ class ObjectController(BaseStorageServer):
|
||||
"""Handle HTTP HEAD requests for the Swift Object Server."""
|
||||
device, partition, account, container, obj, policy = \
|
||||
get_name_and_placement(request, 5, 5, True)
|
||||
frag_prefs = safe_json_loads(
|
||||
request.headers.get('X-Backend-Fragment-Preferences'))
|
||||
try:
|
||||
disk_file = self.get_diskfile(
|
||||
device, partition, account, container, obj,
|
||||
policy=policy)
|
||||
policy=policy, frag_prefs=frag_prefs)
|
||||
except DiskFileDeviceUnavailable:
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
try:
|
||||
@ -938,6 +958,13 @@ class ObjectController(BaseStorageServer):
|
||||
# Needed for container sync feature
|
||||
response.headers['X-Timestamp'] = ts.normal
|
||||
response.headers['X-Backend-Timestamp'] = ts.internal
|
||||
response.headers['X-Backend-Data-Timestamp'] = \
|
||||
disk_file.data_timestamp.internal
|
||||
if disk_file.durable_timestamp:
|
||||
response.headers['X-Backend-Durable-Timestamp'] = \
|
||||
disk_file.durable_timestamp.internal
|
||||
response.headers['X-Backend-Fragments'] = \
|
||||
_make_backend_fragments_header(disk_file.fragments)
|
||||
response.content_length = int(metadata['Content-Length'])
|
||||
try:
|
||||
response.content_encoding = metadata['Content-Encoding']
|
||||
|
@ -729,7 +729,7 @@ def bytes_to_skip(record_size, range_start):
|
||||
class ResumingGetter(object):
|
||||
def __init__(self, app, req, server_type, node_iter, partition, path,
|
||||
backend_headers, concurrency=1, client_chunk_size=None,
|
||||
newest=None):
|
||||
newest=None, header_provider=None):
|
||||
self.app = app
|
||||
self.node_iter = node_iter
|
||||
self.server_type = server_type
|
||||
@ -742,6 +742,8 @@ class ResumingGetter(object):
|
||||
self.used_nodes = []
|
||||
self.used_source_etag = ''
|
||||
self.concurrency = concurrency
|
||||
self.node = None
|
||||
self.header_provider = header_provider
|
||||
|
||||
# stuff from request
|
||||
self.req_method = req.method
|
||||
@ -1093,7 +1095,7 @@ class ResumingGetter(object):
|
||||
@property
|
||||
def last_headers(self):
|
||||
if self.source_headers:
|
||||
return self.source_headers[-1]
|
||||
return HeaderKeyDict(self.source_headers[-1])
|
||||
else:
|
||||
return None
|
||||
|
||||
@ -1101,13 +1103,17 @@ class ResumingGetter(object):
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
if node in self.used_nodes:
|
||||
return False
|
||||
req_headers = dict(self.backend_headers)
|
||||
# a request may be specialised with specific backend headers
|
||||
if self.header_provider:
|
||||
req_headers.update(self.header_provider())
|
||||
start_node_timing = time.time()
|
||||
try:
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(
|
||||
node['ip'], node['port'], node['device'],
|
||||
self.partition, self.req_method, self.path,
|
||||
headers=self.backend_headers,
|
||||
headers=req_headers,
|
||||
query_string=self.req_query_string)
|
||||
self.app.set_node_timing(node, time.time() - start_node_timing)
|
||||
|
||||
@ -1212,6 +1218,7 @@ class ResumingGetter(object):
|
||||
self.used_source_etag = src_headers.get(
|
||||
'x-object-sysmeta-ec-etag',
|
||||
src_headers.get('etag', '')).strip('"')
|
||||
self.node = node
|
||||
return source, node
|
||||
return None, None
|
||||
|
||||
@ -1316,6 +1323,7 @@ class NodeIter(object):
|
||||
self.primary_nodes = self.app.sort_nodes(
|
||||
list(itertools.islice(node_iter, num_primary_nodes)))
|
||||
self.handoff_iter = node_iter
|
||||
self._node_provider = None
|
||||
|
||||
def __iter__(self):
|
||||
self._node_iter = self._node_gen()
|
||||
@ -1344,6 +1352,16 @@ class NodeIter(object):
|
||||
# all the primaries were skipped, and handoffs didn't help
|
||||
self.app.logger.increment('handoff_all_count')
|
||||
|
||||
def set_node_provider(self, callback):
|
||||
"""
|
||||
Install a callback function that will be used during a call to next()
|
||||
to get an alternate node instead of returning the next node from the
|
||||
iterator.
|
||||
:param callback: A no argument function that should return a node dict
|
||||
or None.
|
||||
"""
|
||||
self._node_provider = callback
|
||||
|
||||
def _node_gen(self):
|
||||
for node in self.primary_nodes:
|
||||
if not self.app.error_limited(node):
|
||||
@ -1364,6 +1382,11 @@ class NodeIter(object):
|
||||
return
|
||||
|
||||
def next(self):
|
||||
if self._node_provider:
|
||||
# give node provider the opportunity to inject a node
|
||||
node = self._node_provider()
|
||||
if node:
|
||||
return node
|
||||
return next(self._node_iter)
|
||||
|
||||
def __next__(self):
|
||||
|
@ -47,7 +47,7 @@ from swift.common.utils import (
|
||||
GreenAsyncPile, GreenthreadSafeIterator, Timestamp,
|
||||
normalize_delete_at_timestamp, public, get_expirer_container,
|
||||
document_iters_to_http_response_body, parse_content_range,
|
||||
quorum_size, reiterate, close_if_possible)
|
||||
quorum_size, reiterate, close_if_possible, safe_json_loads)
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_metadata, check_object_creation
|
||||
from swift.common import constraints
|
||||
@ -1835,9 +1835,262 @@ def trailing_metadata(policy, client_obj_hasher,
|
||||
})
|
||||
|
||||
|
||||
class ECGetResponseBucket(object):
|
||||
"""
|
||||
A helper class to encapsulate the properties of buckets in which fragment
|
||||
getters and alternate nodes are collected.
|
||||
"""
|
||||
def __init__(self, policy, timestamp_str):
|
||||
"""
|
||||
:param policy: an instance of ECStoragePolicy
|
||||
:param timestamp_str: a string representation of a timestamp
|
||||
"""
|
||||
self.policy = policy
|
||||
self.timestamp_str = timestamp_str
|
||||
self.gets = collections.defaultdict(list)
|
||||
self.alt_nodes = collections.defaultdict(list)
|
||||
self._durable = False
|
||||
self.status = self.headers = None
|
||||
|
||||
def set_durable(self):
|
||||
self._durable = True
|
||||
|
||||
def add_response(self, getter, parts_iter):
|
||||
if not self.gets:
|
||||
self.status = getter.last_status
|
||||
# stash first set of backend headers, which will be used to
|
||||
# populate a client response
|
||||
# TODO: each bucket is for a single *data* timestamp, but sources
|
||||
# in the same bucket may have different *metadata* timestamps if
|
||||
# some backends have more recent .meta files than others. Currently
|
||||
# we just use the last received metadata headers - this behavior is
|
||||
# ok and is consistent with a replication policy GET which
|
||||
# similarly does not attempt to find the backend with the most
|
||||
# recent metadata. We could alternatively choose to the *newest*
|
||||
# metadata headers for self.headers by selecting the source with
|
||||
# the latest X-Timestamp.
|
||||
self.headers = getter.last_headers
|
||||
elif (getter.last_headers.get('X-Object-Sysmeta-Ec-Etag') !=
|
||||
self.headers.get('X-Object-Sysmeta-Ec-Etag')):
|
||||
# Fragments at the same timestamp with different etags are never
|
||||
# expected. If somehow it happens then ignore those fragments
|
||||
# to avoid mixing fragments that will not reconstruct otherwise
|
||||
# an exception from pyeclib is almost certain. This strategy leaves
|
||||
# a possibility that a set of consistent frags will be gathered.
|
||||
raise ValueError("ETag mismatch")
|
||||
|
||||
frag_index = getter.last_headers.get('X-Object-Sysmeta-Ec-Frag-Index')
|
||||
frag_index = int(frag_index) if frag_index is not None else None
|
||||
self.gets[frag_index].append((getter, parts_iter))
|
||||
|
||||
def get_responses(self):
|
||||
"""
|
||||
Return a list of all useful sources. Where there are multiple sources
|
||||
associated with the same frag_index then only one is included.
|
||||
|
||||
:return: a list of sources, each source being a tuple of form
|
||||
(ResumingGetter, iter)
|
||||
"""
|
||||
all_sources = []
|
||||
for frag_index, sources in self.gets.items():
|
||||
if frag_index is None:
|
||||
# bad responses don't have a frag_index (and fake good
|
||||
# responses from some unit tests)
|
||||
all_sources.extend(sources)
|
||||
else:
|
||||
all_sources.extend(sources[:1])
|
||||
return all_sources
|
||||
|
||||
def add_alternate_nodes(self, node, frag_indexes):
|
||||
for frag_index in frag_indexes:
|
||||
self.alt_nodes[frag_index].append(node)
|
||||
|
||||
@property
|
||||
def shortfall(self):
|
||||
# A non-durable bucket always has a shortfall of at least 1
|
||||
result = self.policy.ec_ndata - len(self.get_responses())
|
||||
return max(result, 0 if self._durable else 1)
|
||||
|
||||
@property
|
||||
def shortfall_with_alts(self):
|
||||
# The shortfall that we expect to have if we were to send requests
|
||||
# for frags on the alt nodes.
|
||||
alts = set(self.alt_nodes.keys()).difference(set(self.gets.keys()))
|
||||
result = self.policy.ec_ndata - (len(self.get_responses()) + len(alts))
|
||||
return max(result, 0 if self._durable else 1)
|
||||
|
||||
def __str__(self):
|
||||
# return a string summarising bucket state, useful for debugging.
|
||||
return '<%s, %s, %s, %s(%s), %s>' \
|
||||
% (self.timestamp_str, self.status, self._durable,
|
||||
self.shortfall, self.shortfall_with_alts, len(self.gets))
|
||||
|
||||
|
||||
class ECGetResponseCollection(object):
|
||||
"""
|
||||
Manages all successful EC GET responses gathered by ResumingGetters.
|
||||
|
||||
A response comprises a tuple of (<getter instance>, <parts iterator>). All
|
||||
responses having the same data timestamp are placed in an
|
||||
ECGetResponseBucket for that timestamp. The buckets are stored in the
|
||||
'buckets' dict which maps timestamp-> bucket.
|
||||
|
||||
This class encapsulates logic for selecting the best bucket from the
|
||||
collection, and for choosing alternate nodes.
|
||||
"""
|
||||
def __init__(self, policy):
|
||||
"""
|
||||
:param policy: an instance of ECStoragePolicy
|
||||
"""
|
||||
self.policy = policy
|
||||
self.buckets = {}
|
||||
self.node_iter_count = 0
|
||||
|
||||
def _get_bucket(self, timestamp_str):
|
||||
"""
|
||||
:param timestamp_str: a string representation of a timestamp
|
||||
:return: ECGetResponseBucket for given timestamp
|
||||
"""
|
||||
return self.buckets.setdefault(
|
||||
timestamp_str, ECGetResponseBucket(self.policy, timestamp_str))
|
||||
|
||||
def add_response(self, get, parts_iter):
|
||||
"""
|
||||
Add a response to the collection.
|
||||
|
||||
:param get: An instance of
|
||||
:class:`~swift.proxy.controllers.base.ResumingGetter`
|
||||
:param parts_iter: An iterator over response body parts
|
||||
:raises ValueError: if the response etag or status code values do not
|
||||
match any values previously received for the same timestamp
|
||||
"""
|
||||
headers = get.last_headers
|
||||
# Add the response to the appropriate bucket keyed by data file
|
||||
# timestamp. Fall back to using X-Backend-Timestamp as key for object
|
||||
# servers that have not been upgraded.
|
||||
t_data_file = headers.get('X-Backend-Data-Timestamp')
|
||||
t_obj = headers.get('X-Backend-Timestamp', headers.get('X-Timestamp'))
|
||||
self._get_bucket(t_data_file or t_obj).add_response(get, parts_iter)
|
||||
|
||||
# The node may also have alternate fragments indexes (possibly at
|
||||
# different timestamps). For each list of alternate fragments indexes,
|
||||
# find the bucket for their data file timestamp and add the node and
|
||||
# list to that bucket's alternate nodes.
|
||||
frag_sets = safe_json_loads(headers.get('X-Backend-Fragments')) or {}
|
||||
for t_frag, frag_set in frag_sets.items():
|
||||
self._get_bucket(t_frag).add_alternate_nodes(get.node, frag_set)
|
||||
# If the response includes a durable timestamp then mark that bucket as
|
||||
# durable. Note that this may be a different bucket than the one this
|
||||
# response got added to, and that we may never go and get a durable
|
||||
# frag from this node; it is sufficient that we have been told that a
|
||||
# .durable exists, somewhere, at t_durable.
|
||||
t_durable = headers.get('X-Backend-Durable-Timestamp')
|
||||
if not t_durable and not t_data_file:
|
||||
# obj server not upgraded so assume this response's frag is durable
|
||||
t_durable = t_obj
|
||||
if t_durable:
|
||||
self._get_bucket(t_durable).set_durable()
|
||||
|
||||
def _sort_buckets(self):
|
||||
def key_fn(bucket):
|
||||
# Returns a tuple to use for sort ordering:
|
||||
# buckets with no shortfall sort higher,
|
||||
# otherwise buckets with lowest shortfall_with_alts sort higher,
|
||||
# finally buckets with newer timestamps sort higher.
|
||||
return (bucket.shortfall <= 0,
|
||||
(not (bucket.shortfall <= 0) and
|
||||
(-1 * bucket.shortfall_with_alts)),
|
||||
bucket.timestamp_str)
|
||||
|
||||
return sorted(self.buckets.values(), key=key_fn, reverse=True)
|
||||
|
||||
@property
|
||||
def best_bucket(self):
|
||||
"""
|
||||
Return the best bucket in the collection.
|
||||
|
||||
The "best" bucket is the newest timestamp with sufficient getters, or
|
||||
the closest to having a sufficient getters, unless it is bettered by a
|
||||
bucket with potential alternate nodes.
|
||||
|
||||
:return: An instance of :class:`~ECGetResponseBucket` or None if there
|
||||
are no buckets in the collection.
|
||||
"""
|
||||
sorted_buckets = self._sort_buckets()
|
||||
if sorted_buckets:
|
||||
return sorted_buckets[0]
|
||||
return None
|
||||
|
||||
def _get_frag_prefs(self):
|
||||
# Construct the current frag_prefs list, with best_bucket prefs first.
|
||||
frag_prefs = []
|
||||
|
||||
for bucket in self._sort_buckets():
|
||||
if bucket.timestamp_str:
|
||||
exclusions = [fi for fi in bucket.gets if fi is not None]
|
||||
prefs = {'timestamp': bucket.timestamp_str,
|
||||
'exclude': exclusions}
|
||||
frag_prefs.append(prefs)
|
||||
|
||||
return frag_prefs
|
||||
|
||||
def get_extra_headers(self):
|
||||
frag_prefs = self._get_frag_prefs()
|
||||
return {'X-Backend-Fragment-Preferences': json.dumps(frag_prefs)}
|
||||
|
||||
def _get_alternate_nodes(self):
|
||||
if self.node_iter_count <= self.policy.ec_ndata:
|
||||
# It makes sense to wait before starting to use alternate nodes,
|
||||
# because if we find sufficient frags on *distinct* nodes then we
|
||||
# spread work across mode nodes. There's no formal proof that
|
||||
# waiting for ec_ndata GETs is the right answer, but it seems
|
||||
# reasonable to try *at least* that many primary nodes before
|
||||
# resorting to alternate nodes.
|
||||
return None
|
||||
|
||||
bucket = self.best_bucket
|
||||
if (bucket is None) or (bucket.shortfall <= 0):
|
||||
return None
|
||||
|
||||
alt_frags = set(bucket.alt_nodes.keys())
|
||||
got_frags = set(bucket.gets.keys())
|
||||
wanted_frags = list(alt_frags.difference(got_frags))
|
||||
|
||||
# We may have the same frag_index on more than one node so shuffle to
|
||||
# avoid using the same frag_index consecutively, since we may not get a
|
||||
# response from the last node provided before being asked to provide
|
||||
# another node.
|
||||
random.shuffle(wanted_frags)
|
||||
|
||||
for frag_index in wanted_frags:
|
||||
nodes = bucket.alt_nodes.get(frag_index)
|
||||
if nodes:
|
||||
return nodes
|
||||
return None
|
||||
|
||||
def has_alternate_node(self):
|
||||
return True if self._get_alternate_nodes() else False
|
||||
|
||||
def provide_alternate_node(self):
|
||||
"""
|
||||
Callback function that is installed in a NodeIter. Called on every call
|
||||
to NodeIter.next(), which means we can track the number of nodes to
|
||||
which GET requests have been made and selectively inject an alternate
|
||||
node, if we have one.
|
||||
|
||||
:return: A dict describing a node to which the next GET request
|
||||
should be made.
|
||||
"""
|
||||
self.node_iter_count += 1
|
||||
nodes = self._get_alternate_nodes()
|
||||
if nodes:
|
||||
return nodes.pop(0).copy()
|
||||
|
||||
|
||||
@ObjectControllerRouter.register(EC_POLICY)
|
||||
class ECObjectController(BaseObjectController):
|
||||
def _fragment_GET_request(self, req, node_iter, partition, policy):
|
||||
def _fragment_GET_request(self, req, node_iter, partition, policy,
|
||||
header_provider=None):
|
||||
"""
|
||||
Makes a GET request for a fragment.
|
||||
"""
|
||||
@ -1848,7 +2101,7 @@ class ECObjectController(BaseObjectController):
|
||||
partition, req.swift_entity_path,
|
||||
backend_headers,
|
||||
client_chunk_size=policy.fragment_size,
|
||||
newest=False)
|
||||
newest=False, header_provider=header_provider)
|
||||
return (getter, getter.response_parts_iter(req))
|
||||
|
||||
def _convert_range(self, req, policy):
|
||||
@ -1914,93 +2167,130 @@ class ECObjectController(BaseObjectController):
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Object'), node_iter, partition,
|
||||
req.swift_entity_path, concurrency)
|
||||
else: # GET request
|
||||
orig_range = None
|
||||
range_specs = []
|
||||
if req.range:
|
||||
orig_range = req.range
|
||||
range_specs = self._convert_range(req, policy)
|
||||
self._fix_response(req, resp)
|
||||
return resp
|
||||
|
||||
safe_iter = GreenthreadSafeIterator(node_iter)
|
||||
# Sending the request concurrently to all nodes, and responding
|
||||
# with the first response isn't something useful for EC as all
|
||||
# nodes contain different fragments. Also EC has implemented it's
|
||||
# own specific implementation of concurrent gets to ec_ndata nodes.
|
||||
# So we don't need to worry about plumbing and sending a
|
||||
# concurrency value to ResumingGetter.
|
||||
with ContextPool(policy.ec_ndata) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
for _junk in range(policy.ec_ndata):
|
||||
pile.spawn(self._fragment_GET_request,
|
||||
req, safe_iter, partition,
|
||||
policy)
|
||||
# GET request
|
||||
orig_range = None
|
||||
range_specs = []
|
||||
if req.range:
|
||||
orig_range = req.range
|
||||
range_specs = self._convert_range(req, policy)
|
||||
|
||||
bad_gets = []
|
||||
etag_buckets = collections.defaultdict(list)
|
||||
best_etag = None
|
||||
for get, parts_iter in pile:
|
||||
if is_success(get.last_status):
|
||||
etag = HeaderKeyDict(
|
||||
get.last_headers)['X-Object-Sysmeta-Ec-Etag']
|
||||
etag_buckets[etag].append((get, parts_iter))
|
||||
if etag != best_etag and (
|
||||
len(etag_buckets[etag]) >
|
||||
len(etag_buckets[best_etag])):
|
||||
best_etag = etag
|
||||
else:
|
||||
bad_gets.append((get, parts_iter))
|
||||
matching_response_count = max(
|
||||
len(etag_buckets[best_etag]), len(bad_gets))
|
||||
if (policy.ec_ndata - matching_response_count >
|
||||
pile._pending) and node_iter.nodes_left > 0:
|
||||
# we need more matching responses to reach ec_ndata
|
||||
# than we have pending gets, as long as we still have
|
||||
# nodes in node_iter we can spawn another
|
||||
pile.spawn(self._fragment_GET_request, req,
|
||||
safe_iter, partition, policy)
|
||||
safe_iter = GreenthreadSafeIterator(node_iter)
|
||||
# Sending the request concurrently to all nodes, and responding
|
||||
# with the first response isn't something useful for EC as all
|
||||
# nodes contain different fragments. Also EC has implemented it's
|
||||
# own specific implementation of concurrent gets to ec_ndata nodes.
|
||||
# So we don't need to worry about plumbing and sending a
|
||||
# concurrency value to ResumingGetter.
|
||||
with ContextPool(policy.ec_ndata) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
buckets = ECGetResponseCollection(policy)
|
||||
node_iter.set_node_provider(buckets.provide_alternate_node)
|
||||
# include what may well be an empty X-Backend-Fragment-Preferences
|
||||
# header from the buckets.get_extra_headers to let the object
|
||||
# server know that it is ok to return non-durable fragments
|
||||
for _junk in range(policy.ec_ndata):
|
||||
pile.spawn(self._fragment_GET_request,
|
||||
req, safe_iter, partition,
|
||||
policy, buckets.get_extra_headers)
|
||||
|
||||
req.range = orig_range
|
||||
if len(etag_buckets[best_etag]) >= policy.ec_ndata:
|
||||
# headers can come from any of the getters
|
||||
resp_headers = HeaderKeyDict(
|
||||
etag_buckets[best_etag][0][0].source_headers[-1])
|
||||
resp_headers.pop('Content-Range', None)
|
||||
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
|
||||
obj_length = int(eccl) if eccl is not None else None
|
||||
|
||||
# This is only true if we didn't get a 206 response, but
|
||||
# that's the only time this is used anyway.
|
||||
fa_length = int(resp_headers['Content-Length'])
|
||||
app_iter = ECAppIter(
|
||||
req.swift_entity_path,
|
||||
policy,
|
||||
[iterator for getter, iterator in etag_buckets[best_etag]],
|
||||
range_specs, fa_length, obj_length,
|
||||
self.app.logger)
|
||||
resp = Response(
|
||||
request=req,
|
||||
headers=resp_headers,
|
||||
conditional_response=True,
|
||||
app_iter=app_iter)
|
||||
bad_bucket = ECGetResponseBucket(policy, None)
|
||||
bad_bucket.set_durable()
|
||||
best_bucket = None
|
||||
extra_requests = 0
|
||||
# max_extra_requests is an arbitrary hard limit for spawning extra
|
||||
# getters in case some unforeseen scenario, or a misbehaving object
|
||||
# server, causes us to otherwise make endless requests e.g. if an
|
||||
# object server were to ignore frag_prefs and always respond with
|
||||
# a frag that is already in a bucket.
|
||||
max_extra_requests = 2 * policy.ec_nparity + policy.ec_ndata
|
||||
for get, parts_iter in pile:
|
||||
if get.last_status is None:
|
||||
# We may have spawned getters that find the node iterator
|
||||
# has been exhausted. Ignore them.
|
||||
# TODO: turns out that node_iter.nodes_left can bottom
|
||||
# out at >0 when number of devs in ring is < 2* replicas,
|
||||
# which definitely happens in tests and results in status
|
||||
# of None. We should fix that but keep this guard because
|
||||
# there is also a race between testing nodes_left/spawning
|
||||
# a getter and an existing getter calling next(node_iter).
|
||||
continue
|
||||
try:
|
||||
app_iter.kickoff(req, resp)
|
||||
except HTTPException as err_resp:
|
||||
# catch any HTTPException response here so that we can
|
||||
# process response headers uniformly in _fix_response
|
||||
resp = err_resp
|
||||
else:
|
||||
statuses = []
|
||||
reasons = []
|
||||
bodies = []
|
||||
headers = []
|
||||
for getter, body_parts_iter in bad_gets:
|
||||
statuses.extend(getter.statuses)
|
||||
reasons.extend(getter.reasons)
|
||||
bodies.extend(getter.bodies)
|
||||
headers.extend(getter.source_headers)
|
||||
resp = self.best_response(
|
||||
req, statuses, reasons, bodies, 'Object',
|
||||
headers=headers)
|
||||
if is_success(get.last_status):
|
||||
# 2xx responses are managed by a response collection
|
||||
buckets.add_response(get, parts_iter)
|
||||
else:
|
||||
# all other responses are lumped into a single bucket
|
||||
bad_bucket.add_response(get, parts_iter)
|
||||
except ValueError as err:
|
||||
self.app.logger.error(
|
||||
_("Problem with fragment response: %s"), err)
|
||||
shortfall = bad_bucket.shortfall
|
||||
best_bucket = buckets.best_bucket
|
||||
if best_bucket:
|
||||
shortfall = min(best_bucket.shortfall, shortfall)
|
||||
if (extra_requests < max_extra_requests and
|
||||
shortfall > pile._pending and
|
||||
(node_iter.nodes_left > 0 or
|
||||
buckets.has_alternate_node())):
|
||||
# we need more matching responses to reach ec_ndata
|
||||
# than we have pending gets, as long as we still have
|
||||
# nodes in node_iter we can spawn another
|
||||
extra_requests += 1
|
||||
pile.spawn(self._fragment_GET_request, req,
|
||||
safe_iter, partition, policy,
|
||||
buckets.get_extra_headers)
|
||||
|
||||
req.range = orig_range
|
||||
if best_bucket and best_bucket.shortfall <= 0:
|
||||
# headers can come from any of the getters
|
||||
resp_headers = best_bucket.headers
|
||||
resp_headers.pop('Content-Range', None)
|
||||
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
|
||||
obj_length = int(eccl) if eccl is not None else None
|
||||
|
||||
# This is only true if we didn't get a 206 response, but
|
||||
# that's the only time this is used anyway.
|
||||
fa_length = int(resp_headers['Content-Length'])
|
||||
app_iter = ECAppIter(
|
||||
req.swift_entity_path,
|
||||
policy,
|
||||
[parts_iter for
|
||||
_getter, parts_iter in best_bucket.get_responses()],
|
||||
range_specs, fa_length, obj_length,
|
||||
self.app.logger)
|
||||
resp = Response(
|
||||
request=req,
|
||||
headers=resp_headers,
|
||||
conditional_response=True,
|
||||
app_iter=app_iter)
|
||||
try:
|
||||
app_iter.kickoff(req, resp)
|
||||
except HTTPException as err_resp:
|
||||
# catch any HTTPException response here so that we can
|
||||
# process response headers uniformly in _fix_response
|
||||
resp = err_resp
|
||||
else:
|
||||
# TODO: we can get here if all buckets are successful but none
|
||||
# have ec_ndata getters, so bad_bucket may have no gets and we will
|
||||
# return a 503 when a 404 may be more appropriate. We can also get
|
||||
# here with less than ec_ndata 416's and may then return a 416
|
||||
# which is also questionable because a non-range get for same
|
||||
# object would return 404 or 503.
|
||||
statuses = []
|
||||
reasons = []
|
||||
bodies = []
|
||||
headers = []
|
||||
for getter, _parts_iter in bad_bucket.get_responses():
|
||||
statuses.extend(getter.statuses)
|
||||
reasons.extend(getter.reasons)
|
||||
bodies.extend(getter.bodies)
|
||||
headers.extend(getter.source_headers)
|
||||
resp = self.best_response(
|
||||
req, statuses, reasons, bodies, 'Object',
|
||||
headers=headers)
|
||||
self._fix_response(req, resp)
|
||||
return resp
|
||||
|
||||
|
@ -3664,6 +3664,36 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
os.close(fd)
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
def test_safe_json_loads(self):
|
||||
expectations = {
|
||||
None: None,
|
||||
'': None,
|
||||
0: None,
|
||||
1: None,
|
||||
'"asdf"': 'asdf',
|
||||
'[]': [],
|
||||
'{}': {},
|
||||
"{'foo': 'bar'}": None,
|
||||
'{"foo": "bar"}': {'foo': 'bar'},
|
||||
}
|
||||
|
||||
failures = []
|
||||
for value, expected in expectations.items():
|
||||
try:
|
||||
result = utils.safe_json_loads(value)
|
||||
except Exception as e:
|
||||
# it's called safe, if it blows up the test blows up
|
||||
self.fail('%r caused safe method to throw %r!' % (
|
||||
value, e))
|
||||
try:
|
||||
self.assertEqual(expected, result)
|
||||
except AssertionError:
|
||||
failures.append('%r => %r (expected %r)' % (
|
||||
value, result, expected))
|
||||
if failures:
|
||||
self.fail('Invalid results from pure function:\n%s' %
|
||||
'\n'.join(failures))
|
||||
|
||||
|
||||
class ResellerConfReader(unittest.TestCase):
|
||||
|
||||
|
@ -591,14 +591,16 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
def tearDown(self):
|
||||
rmtree(self.tmpdir, ignore_errors=1)
|
||||
|
||||
def _get_diskfile(self, policy, frag_index=None):
|
||||
def _get_diskfile(self, policy, frag_index=None, **kwargs):
|
||||
df_mgr = self.df_router[policy]
|
||||
return df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_index=frag_index)
|
||||
policy=policy, frag_index=frag_index,
|
||||
**kwargs)
|
||||
|
||||
def _test_get_ondisk_files(self, scenarios, policy,
|
||||
frag_index=None):
|
||||
class_under_test = self._get_diskfile(policy, frag_index=frag_index)
|
||||
frag_index=None, **kwargs):
|
||||
class_under_test = self._get_diskfile(
|
||||
policy, frag_index=frag_index, **kwargs)
|
||||
for test in scenarios:
|
||||
# test => [('filename.ext', '.ext'|False, ...), ...]
|
||||
expected = {
|
||||
@ -610,7 +612,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
files = list(list(zip(*test))[0])
|
||||
|
||||
for _order in ('ordered', 'shuffled', 'shuffled'):
|
||||
class_under_test = self._get_diskfile(policy, frag_index)
|
||||
class_under_test = self._get_diskfile(
|
||||
policy, frag_index=frag_index, **kwargs)
|
||||
try:
|
||||
actual = class_under_test._get_ondisk_files(files)
|
||||
self._assertDictContainsSubset(
|
||||
@ -621,8 +624,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.fail('%s with files %s' % (str(e), files))
|
||||
shuffle(files)
|
||||
|
||||
def _test_cleanup_ondisk_files_files(self, scenarios, policy,
|
||||
reclaim_age=None):
|
||||
def _test_cleanup_ondisk_files(self, scenarios, policy,
|
||||
reclaim_age=None):
|
||||
# check that expected files are left in hashdir after cleanup
|
||||
for test in scenarios:
|
||||
class_under_test = self.df_router[policy]
|
||||
@ -753,8 +756,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
[('%s.meta' % older, False, False),
|
||||
('%s.ts' % older, False, False)]]
|
||||
|
||||
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
|
||||
def test_construct_dev_path(self):
|
||||
res_path = self.df_mgr.construct_dev_path('abc')
|
||||
@ -1165,7 +1168,7 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
]
|
||||
|
||||
self._test_get_ondisk_files(scenarios, POLICIES[0], None)
|
||||
self._test_cleanup_ondisk_files_files(scenarios, POLICIES[0])
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES[0])
|
||||
self._test_yield_hashes_cleanup(scenarios, POLICIES[0])
|
||||
|
||||
def test_get_ondisk_files_with_stray_meta(self):
|
||||
@ -1248,8 +1251,8 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
[('%s.meta' % older, '.meta'),
|
||||
('%s.data' % much_older, '.data')]]
|
||||
|
||||
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
|
||||
def test_yield_hashes(self):
|
||||
old_ts = '1383180000.12345'
|
||||
@ -1437,23 +1440,6 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
('0000000007.00000#1.data', '.data'),
|
||||
('0000000007.00000#0.data', False, True)],
|
||||
|
||||
# data with no durable is ignored
|
||||
[('0000000007.00000#0.data', False, True)],
|
||||
|
||||
# data newer than tombstone with no durable is ignored
|
||||
[('0000000007.00000#0.data', False, True),
|
||||
('0000000006.00000.ts', '.ts', True)],
|
||||
|
||||
# data newer than durable is ignored
|
||||
[('0000000008.00000#1.data', False, True),
|
||||
('0000000007.00000.durable', '.durable'),
|
||||
('0000000007.00000#1.data', '.data'),
|
||||
('0000000007.00000#0.data', False, True)],
|
||||
|
||||
# data newer than durable ignored, even if its only data
|
||||
[('0000000008.00000#1.data', False, True),
|
||||
('0000000007.00000.durable', False, False)],
|
||||
|
||||
# data older than durable is ignored
|
||||
[('0000000007.00000.durable', '.durable'),
|
||||
('0000000007.00000#1.data', '.data'),
|
||||
@ -1489,16 +1475,79 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
('0000000006.00000.ts', '.ts'),
|
||||
('0000000006.00000.durable', False),
|
||||
('0000000006.00000#0.data', False)],
|
||||
|
||||
# missing durable invalidates data
|
||||
[('0000000006.00000.meta', False, True),
|
||||
('0000000006.00000#0.data', False, True)]
|
||||
]
|
||||
|
||||
self._test_get_ondisk_files(scenarios, POLICIES.default, None)
|
||||
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default)
|
||||
# these scenarios have same outcome regardless of whether any
|
||||
# fragment preferences are specified
|
||||
self._test_get_ondisk_files(scenarios, POLICIES.default,
|
||||
frag_index=None)
|
||||
self._test_get_ondisk_files(scenarios, POLICIES.default,
|
||||
frag_index=None, frag_prefs=[])
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
|
||||
self._test_yield_hashes_cleanup(scenarios, POLICIES.default)
|
||||
|
||||
# next scenarios have different outcomes dependent on whether a
|
||||
# frag_prefs parameter is passed to diskfile constructor or not
|
||||
scenarios = [
|
||||
# data with no durable is ignored
|
||||
[('0000000007.00000#0.data', False, True)],
|
||||
|
||||
# data newer than tombstone with no durable is ignored
|
||||
[('0000000007.00000#0.data', False, True),
|
||||
('0000000006.00000.ts', '.ts', True)],
|
||||
|
||||
# data newer than durable is ignored
|
||||
[('0000000009.00000#2.data', False, True),
|
||||
('0000000009.00000#1.data', False, True),
|
||||
('0000000008.00000#3.data', False, True),
|
||||
('0000000007.00000.durable', '.durable'),
|
||||
('0000000007.00000#1.data', '.data'),
|
||||
('0000000007.00000#0.data', False, True)],
|
||||
|
||||
# data newer than durable ignored, even if its only data
|
||||
[('0000000008.00000#1.data', False, True),
|
||||
('0000000007.00000.durable', False, False)],
|
||||
|
||||
# missing durable invalidates data, older meta deleted
|
||||
[('0000000007.00000.meta', False, True),
|
||||
('0000000006.00000#0.data', False, True),
|
||||
('0000000005.00000.meta', False, False),
|
||||
('0000000004.00000#1.data', False, True)]]
|
||||
|
||||
self._test_get_ondisk_files(scenarios, POLICIES.default,
|
||||
frag_index=None)
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
|
||||
|
||||
scenarios = [
|
||||
# data with no durable is chosen
|
||||
[('0000000007.00000#0.data', '.data', True)],
|
||||
|
||||
# data newer than tombstone with no durable is chosen
|
||||
[('0000000007.00000#0.data', '.data', True),
|
||||
('0000000006.00000.ts', False, True)],
|
||||
|
||||
# data newer than durable is chosen, older data preserved
|
||||
[('0000000009.00000#2.data', '.data', True),
|
||||
('0000000009.00000#1.data', False, True),
|
||||
('0000000008.00000#3.data', False, True),
|
||||
('0000000007.00000.durable', False, True),
|
||||
('0000000007.00000#1.data', False, True),
|
||||
('0000000007.00000#0.data', False, True)],
|
||||
|
||||
# data newer than durable chosen when its only data
|
||||
[('0000000008.00000#1.data', '.data', True),
|
||||
('0000000007.00000.durable', False, False)],
|
||||
|
||||
# data plus meta chosen without durable, older meta deleted
|
||||
[('0000000007.00000.meta', '.meta', True),
|
||||
('0000000006.00000#0.data', '.data', True),
|
||||
('0000000005.00000.meta', False, False),
|
||||
('0000000004.00000#1.data', False, True)]]
|
||||
|
||||
self._test_get_ondisk_files(scenarios, POLICIES.default,
|
||||
frag_index=None, frag_prefs=[])
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
|
||||
|
||||
def test_get_ondisk_files_with_ec_policy_and_frag_index(self):
|
||||
# Each scenario specifies a list of (filename, extension) tuples. If
|
||||
# extension is set then that filename should be returned by the method
|
||||
@ -1512,20 +1561,20 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
[('0000000007.00000#2.data', False, True),
|
||||
('0000000007.00000#1.data', False, True),
|
||||
('0000000007.00000#0.data', False, True),
|
||||
('0000000006.00000.durable', '.durable')],
|
||||
('0000000006.00000.durable', False)],
|
||||
|
||||
# specific frag older than durable is ignored
|
||||
[('0000000007.00000#2.data', False),
|
||||
('0000000007.00000#1.data', False),
|
||||
('0000000007.00000#0.data', False),
|
||||
('0000000008.00000.durable', '.durable')],
|
||||
('0000000008.00000.durable', False)],
|
||||
|
||||
# specific frag older than newest durable is ignored
|
||||
# even if is also has a durable
|
||||
[('0000000007.00000#2.data', False),
|
||||
('0000000007.00000#1.data', False),
|
||||
('0000000007.00000.durable', False),
|
||||
('0000000008.00000#0.data', False),
|
||||
('0000000008.00000#0.data', False, True),
|
||||
('0000000008.00000.durable', '.durable')],
|
||||
|
||||
# meta included when frag index is specified
|
||||
@ -1559,12 +1608,23 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
# frag_index, get_ondisk_files will tolerate .meta with
|
||||
# no .data
|
||||
[('0000000088.00000.meta', False, True),
|
||||
('0000000077.00000.durable', '.durable')]
|
||||
('0000000077.00000.durable', False, False)]
|
||||
]
|
||||
|
||||
self._test_get_ondisk_files(scenarios, POLICIES.default, frag_index=1)
|
||||
# note: not calling self._test_cleanup_ondisk_files_files(scenarios, 0)
|
||||
# here due to the anomalous scenario as commented above
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
|
||||
|
||||
# scenarios for empty frag_prefs, meaning durable not required
|
||||
scenarios = [
|
||||
# specific frag newer than durable is chosen
|
||||
[('0000000007.00000#2.data', False, True),
|
||||
('0000000007.00000#1.data', '.data', True),
|
||||
('0000000007.00000#0.data', False, True),
|
||||
('0000000006.00000.durable', False, False)],
|
||||
]
|
||||
self._test_get_ondisk_files(scenarios, POLICIES.default, frag_index=1,
|
||||
frag_prefs=[])
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default)
|
||||
|
||||
def test_cleanup_ondisk_files_reclaim_with_data_files(self):
|
||||
# Each scenario specifies a list of (filename, extension, [survives])
|
||||
@ -1622,8 +1682,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
||||
[('%s.meta' % older, False, False),
|
||||
('%s.durable' % much_older, False, False)]]
|
||||
|
||||
self._test_cleanup_ondisk_files_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
self._test_cleanup_ondisk_files(scenarios, POLICIES.default,
|
||||
reclaim_age=1000)
|
||||
|
||||
def test_get_ondisk_files_with_stray_meta(self):
|
||||
# get_ondisk_files ignores a stray .meta file
|
||||
@ -4574,6 +4634,233 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
||||
df.open()
|
||||
self.assertEqual(ts1, df.durable_timestamp)
|
||||
|
||||
def test_open_with_fragment_preferences(self):
|
||||
policy = POLICIES.default
|
||||
df_mgr = self.df_router[policy]
|
||||
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0',
|
||||
'a', 'c', 'o', policy=policy)
|
||||
|
||||
ts_1, ts_2, ts_3, ts_4 = (self.ts() for _ in range(4))
|
||||
|
||||
# create two durable frags, first with index 0
|
||||
with df.create() as writer:
|
||||
data = 'test data'
|
||||
writer.write(data)
|
||||
frag_0_metadata = {
|
||||
'ETag': md5(data).hexdigest(),
|
||||
'X-Timestamp': ts_1.internal,
|
||||
'Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': 0,
|
||||
}
|
||||
writer.put(frag_0_metadata)
|
||||
writer.commit(ts_1)
|
||||
|
||||
# second with index 3
|
||||
with df.create() as writer:
|
||||
data = 'test data'
|
||||
writer.write(data)
|
||||
frag_3_metadata = {
|
||||
'ETag': md5(data).hexdigest(),
|
||||
'X-Timestamp': ts_1.internal,
|
||||
'Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': 3,
|
||||
}
|
||||
writer.put(frag_3_metadata)
|
||||
writer.commit(ts_1)
|
||||
|
||||
# sanity check: should have 2 * .data plus a .durable
|
||||
self.assertEqual(3, len(os.listdir(df._datadir)))
|
||||
|
||||
# add some .meta stuff
|
||||
meta_1_metadata = {
|
||||
'X-Object-Meta-Foo': 'Bar',
|
||||
'X-Timestamp': ts_2.internal,
|
||||
}
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0',
|
||||
'a', 'c', 'o', policy=policy)
|
||||
df.write_metadata(meta_1_metadata)
|
||||
# sanity check: should have 2 * .data, .durable, .meta
|
||||
self.assertEqual(4, len(os.listdir(df._datadir)))
|
||||
|
||||
# sanity: should get frag index 3
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0',
|
||||
'a', 'c', 'o', policy=policy)
|
||||
expected = dict(frag_3_metadata)
|
||||
expected.update(meta_1_metadata)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
|
||||
# add a newer datafile for frag index 2
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0',
|
||||
'a', 'c', 'o', policy=policy)
|
||||
with df.create() as writer:
|
||||
data = 'new test data'
|
||||
writer.write(data)
|
||||
frag_2_metadata = {
|
||||
'ETag': md5(data).hexdigest(),
|
||||
'X-Timestamp': ts_3.internal,
|
||||
'Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': 2,
|
||||
}
|
||||
writer.put(frag_2_metadata)
|
||||
# N.B. don't make it durable - skip call to commit()
|
||||
# sanity check: should have 2* .data, .durable, .meta, .data
|
||||
self.assertEqual(5, len(os.listdir(df._datadir)))
|
||||
|
||||
# sanity check: with no frag preferences we get old metadata
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_2.internal, df.timestamp)
|
||||
self.assertEqual(ts_1.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# with empty frag preferences we get metadata from newer non-durable
|
||||
# data file
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_prefs=[])
|
||||
self.assertEqual(frag_2_metadata, df.read_metadata())
|
||||
self.assertEqual(ts_3.internal, df.timestamp)
|
||||
self.assertEqual(ts_3.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# check we didn't destroy any potentially valid data by opening the
|
||||
# non-durable data file
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
|
||||
# now add some newer .meta stuff which should replace older .meta
|
||||
meta_2_metadata = {
|
||||
'X-Object-Meta-Foo': 'BarBarBarAnne',
|
||||
'X-Timestamp': ts_4.internal,
|
||||
}
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0',
|
||||
'a', 'c', 'o', policy=policy)
|
||||
df.write_metadata(meta_2_metadata)
|
||||
# sanity check: should have 2 * .data, .durable, .data, .meta
|
||||
self.assertEqual(5, len(os.listdir(df._datadir)))
|
||||
|
||||
# sanity check: with no frag preferences we get newer metadata applied
|
||||
# to durable data file
|
||||
expected = dict(frag_3_metadata)
|
||||
expected.update(meta_2_metadata)
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_4.internal, df.timestamp)
|
||||
self.assertEqual(ts_1.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# with empty frag preferences we still get metadata from newer .meta
|
||||
# but applied to non-durable data file
|
||||
expected = dict(frag_2_metadata)
|
||||
expected.update(meta_2_metadata)
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_prefs=[])
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_4.internal, df.timestamp)
|
||||
self.assertEqual(ts_3.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# check we didn't destroy any potentially valid data by opening the
|
||||
# non-durable data file
|
||||
expected = dict(frag_3_metadata)
|
||||
expected.update(meta_2_metadata)
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_4.internal, df.timestamp)
|
||||
self.assertEqual(ts_1.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# prefer frags at ts_1, exclude no indexes, expect highest frag index
|
||||
prefs = [{'timestamp': ts_1.internal, 'exclude': []},
|
||||
{'timestamp': ts_2.internal, 'exclude': []},
|
||||
{'timestamp': ts_3.internal, 'exclude': []}]
|
||||
expected = dict(frag_3_metadata)
|
||||
expected.update(meta_2_metadata)
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_prefs=prefs)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_4.internal, df.timestamp)
|
||||
self.assertEqual(ts_1.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# prefer frags at ts_1, exclude frag index 3 so expect frag index 0
|
||||
prefs = [{'timestamp': ts_1.internal, 'exclude': [3]},
|
||||
{'timestamp': ts_2.internal, 'exclude': []},
|
||||
{'timestamp': ts_3.internal, 'exclude': []}]
|
||||
expected = dict(frag_0_metadata)
|
||||
expected.update(meta_2_metadata)
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_prefs=prefs)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_4.internal, df.timestamp)
|
||||
self.assertEqual(ts_1.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# now make ts_3 the preferred timestamp, excluded indexes don't exist
|
||||
prefs = [{'timestamp': ts_3.internal, 'exclude': [4, 5, 6]},
|
||||
{'timestamp': ts_2.internal, 'exclude': []},
|
||||
{'timestamp': ts_1.internal, 'exclude': []}]
|
||||
expected = dict(frag_2_metadata)
|
||||
expected.update(meta_2_metadata)
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_prefs=prefs)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_4.internal, df.timestamp)
|
||||
self.assertEqual(ts_3.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
# now make ts_2 the preferred timestamp - there are no frags at ts_2,
|
||||
# next preference is ts_3 but index 2 is excluded, then at ts_1 index 3
|
||||
# is excluded so we get frag 0 at ts_1
|
||||
prefs = [{'timestamp': ts_2.internal, 'exclude': [1]},
|
||||
{'timestamp': ts_3.internal, 'exclude': [2]},
|
||||
{'timestamp': ts_1.internal, 'exclude': [3]}]
|
||||
|
||||
expected = dict(frag_0_metadata)
|
||||
expected.update(meta_2_metadata)
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_prefs=prefs)
|
||||
self.assertEqual(expected, df.read_metadata())
|
||||
self.assertEqual(ts_4.internal, df.timestamp)
|
||||
self.assertEqual(ts_1.internal, df.data_timestamp)
|
||||
self.assertEqual(ts_1.internal, df.durable_timestamp)
|
||||
self.assertEqual({ts_1: [0, 3], ts_3: [2]}, df.fragments)
|
||||
|
||||
def test_open_with_bad_fragment_preferences(self):
|
||||
policy = POLICIES.default
|
||||
df_mgr = self.df_router[policy]
|
||||
|
||||
for bad in (
|
||||
'ouch',
|
||||
2,
|
||||
[{'timestamp': '1234.5678', 'excludes': [1]}, {}],
|
||||
[{'timestamp': 'not a timestamp', 'excludes': [1, 2]}],
|
||||
[{'timestamp': '1234.5678', 'excludes': [1, -1]}],
|
||||
[{'timestamp': '1234.5678', 'excludes': 1}],
|
||||
[{'timestamp': '1234.5678'}],
|
||||
[{'excludes': [1, 2]}]
|
||||
|
||||
):
|
||||
try:
|
||||
df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c', 'o',
|
||||
policy=policy, frag_prefs=bad)
|
||||
self.fail('Expected DiskFileError for bad frag_prefs: %r'
|
||||
% bad)
|
||||
except DiskFileError as e:
|
||||
self.assertIn('frag_prefs', str(e))
|
||||
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
class TestSuffixHashes(unittest.TestCase):
|
||||
|
@ -233,6 +233,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'Content-Encoding': 'gzip',
|
||||
'X-Backend-Timestamp': post_timestamp,
|
||||
'X-Timestamp': post_timestamp,
|
||||
'X-Backend-Data-Timestamp': put_timestamp,
|
||||
'X-Backend-Durable-Timestamp': put_timestamp,
|
||||
'Last-Modified': strftime(
|
||||
'%a, %d %b %Y %H:%M:%S GMT',
|
||||
gmtime(math.ceil(float(post_timestamp)))),
|
||||
@ -266,6 +268,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Object-Sysmeta-Color': 'blue',
|
||||
'X-Backend-Timestamp': post_timestamp,
|
||||
'X-Timestamp': post_timestamp,
|
||||
'X-Backend-Data-Timestamp': put_timestamp,
|
||||
'X-Backend-Durable-Timestamp': put_timestamp,
|
||||
'Last-Modified': strftime(
|
||||
'%a, %d %b %Y %H:%M:%S GMT',
|
||||
gmtime(math.ceil(float(post_timestamp)))),
|
||||
@ -308,6 +312,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Static-Large-Object': 'True',
|
||||
'X-Backend-Timestamp': put_timestamp,
|
||||
'X-Timestamp': put_timestamp,
|
||||
'X-Backend-Data-Timestamp': put_timestamp,
|
||||
'X-Backend-Durable-Timestamp': put_timestamp,
|
||||
'Last-Modified': strftime(
|
||||
'%a, %d %b %Y %H:%M:%S GMT',
|
||||
gmtime(math.ceil(float(put_timestamp)))),
|
||||
@ -338,6 +344,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Static-Large-Object': 'True',
|
||||
'X-Backend-Timestamp': post_timestamp,
|
||||
'X-Timestamp': post_timestamp,
|
||||
'X-Backend-Data-Timestamp': put_timestamp,
|
||||
'X-Backend-Durable-Timestamp': put_timestamp,
|
||||
'Last-Modified': strftime(
|
||||
'%a, %d %b %Y %H:%M:%S GMT',
|
||||
gmtime(math.ceil(float(post_timestamp)))),
|
||||
@ -368,6 +376,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Static-Large-Object': 'True',
|
||||
'X-Backend-Timestamp': post_timestamp,
|
||||
'X-Timestamp': post_timestamp,
|
||||
'X-Backend-Data-Timestamp': put_timestamp,
|
||||
'X-Backend-Durable-Timestamp': put_timestamp,
|
||||
'Last-Modified': strftime(
|
||||
'%a, %d %b %Y %H:%M:%S GMT',
|
||||
gmtime(math.ceil(float(post_timestamp)))),
|
||||
@ -3185,6 +3195,238 @@ class TestObjectController(unittest.TestCase):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 412)
|
||||
|
||||
def _create_ondisk_fragments(self, policy):
|
||||
# Create some on disk files...
|
||||
ts_iter = make_timestamp_iter()
|
||||
|
||||
# PUT at ts_0
|
||||
ts_0 = next(ts_iter)
|
||||
headers = {'X-Timestamp': ts_0.internal,
|
||||
'Content-Length': '5',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Backend-Storage-Policy-Index': int(policy)}
|
||||
if policy.policy_type == EC_POLICY:
|
||||
headers['X-Object-Sysmeta-Ec-Frag-Index'] = '0'
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers=headers)
|
||||
req.body = 'OLDER'
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# POST at ts_1
|
||||
ts_1 = next(ts_iter)
|
||||
headers = {'X-Timestamp': ts_1.internal,
|
||||
'X-Backend-Storage-Policy-Index': int(policy)}
|
||||
headers['X-Object-Meta-Test'] = 'abc'
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers=headers)
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
# PUT again at ts_2 but without a .durable file
|
||||
ts_2 = next(ts_iter)
|
||||
headers = {'X-Timestamp': ts_2.internal,
|
||||
'Content-Length': '5',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Backend-Storage-Policy-Index': int(policy)}
|
||||
if policy.policy_type == EC_POLICY:
|
||||
headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2'
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers=headers)
|
||||
req.body = 'NEWER'
|
||||
# patch the commit method to do nothing so EC object gets
|
||||
# no .durable file
|
||||
with mock.patch('swift.obj.diskfile.ECDiskFileWriter.commit'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
return ts_0, ts_1, ts_2
|
||||
|
||||
def test_GET_HEAD_with_fragment_preferences(self):
|
||||
for policy in POLICIES:
|
||||
ts_0, ts_1, ts_2 = self._create_ondisk_fragments(policy)
|
||||
|
||||
backend_frags = json.dumps({ts_0.internal: [0],
|
||||
ts_2.internal: [2]})
|
||||
|
||||
def _assert_frag_0_at_ts_0(resp):
|
||||
expect = {
|
||||
'X-Timestamp': ts_1.normal,
|
||||
'X-Backend-Timestamp': ts_1.internal,
|
||||
'X-Backend-Data-Timestamp': ts_0.internal,
|
||||
'X-Backend-Durable-Timestamp': ts_0.internal,
|
||||
'X-Backend-Fragments': backend_frags,
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': '0',
|
||||
'X-Object-Meta-Test': 'abc'}
|
||||
self.assertDictContainsSubset(expect, resp.headers)
|
||||
|
||||
def _assert_repl_data_at_ts_2():
|
||||
self.assertIn(resp.status_int, (200, 202))
|
||||
expect = {
|
||||
'X-Timestamp': ts_2.normal,
|
||||
'X-Backend-Timestamp': ts_2.internal,
|
||||
'X-Backend-Data-Timestamp': ts_2.internal,
|
||||
'X-Backend-Durable-Timestamp': ts_2.internal}
|
||||
self.assertDictContainsSubset(expect, resp.headers)
|
||||
self.assertNotIn('X-Object-Meta-Test', resp.headers)
|
||||
|
||||
# Sanity check: Request with no preferences should default to the
|
||||
# durable frag
|
||||
headers = {'X-Backend-Storage-Policy-Index': int(policy)}
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_0_at_ts_0(resp)
|
||||
self.assertEqual(resp.body, 'OLDER')
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
self.assertEqual(resp.body, 'NEWER')
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_0_at_ts_0(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
|
||||
# Request with preferences can select the older frag
|
||||
prefs = json.dumps(
|
||||
[{'timestamp': ts_0.internal, 'exclude': [1, 3]}])
|
||||
headers = {'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Backend-Fragment-Preferences': prefs}
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_0_at_ts_0(resp)
|
||||
self.assertEqual(resp.body, 'OLDER')
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
self.assertEqual(resp.body, 'NEWER')
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_0_at_ts_0(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
|
||||
def _assert_frag_2_at_ts_2(resp):
|
||||
self.assertIn(resp.status_int, (200, 202))
|
||||
# do not expect meta file to be included since it is older
|
||||
expect = {
|
||||
'X-Timestamp': ts_2.normal,
|
||||
'X-Backend-Timestamp': ts_2.internal,
|
||||
'X-Backend-Data-Timestamp': ts_2.internal,
|
||||
'X-Backend-Durable-Timestamp': ts_0.internal,
|
||||
'X-Backend-Fragments': backend_frags,
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': '2'}
|
||||
self.assertDictContainsSubset(expect, resp.headers)
|
||||
self.assertNotIn('X-Object-Meta-Test', resp.headers)
|
||||
|
||||
# Request with preferences can select the newer non-durable frag
|
||||
prefs = json.dumps(
|
||||
[{'timestamp': ts_2.internal, 'exclude': [1, 3]}])
|
||||
headers = {'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Backend-Fragment-Preferences': prefs}
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_2_at_ts_2(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
self.assertEqual(resp.body, 'NEWER')
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_2_at_ts_2(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
|
||||
# Request with preference for ts_0 but excludes index 0 will
|
||||
# default to newest frag
|
||||
prefs = json.dumps(
|
||||
[{'timestamp': ts_0.internal, 'exclude': [0]}])
|
||||
headers = {'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Backend-Fragment-Preferences': prefs}
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_2_at_ts_2(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
self.assertEqual(resp.body, 'NEWER')
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_2_at_ts_2(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
|
||||
# Request with preferences that exclude all frags get nothing
|
||||
prefs = json.dumps(
|
||||
[{'timestamp': ts_0.internal, 'exclude': [0]},
|
||||
{'timestamp': ts_2.internal, 'exclude': [2]}])
|
||||
headers = {'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Backend-Fragment-Preferences': prefs}
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
if policy.policy_type == EC_POLICY:
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
self.assertEqual(resp.body, 'NEWER')
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
|
||||
# Request with empty preferences will get non-durable
|
||||
prefs = json.dumps([])
|
||||
headers = {'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Backend-Fragment-Preferences': prefs}
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_2_at_ts_2(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
self.assertEqual(resp.body, 'NEWER')
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o', headers=headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
if policy.policy_type == EC_POLICY:
|
||||
_assert_frag_2_at_ts_2(resp)
|
||||
else:
|
||||
_assert_repl_data_at_ts_2()
|
||||
|
||||
def test_GET_quarantine(self):
|
||||
# Test swift.obj.server.ObjectController.GET
|
||||
timestamp = normalize_timestamp(time())
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -24,11 +24,12 @@ import sys
|
||||
import traceback
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
from shutil import rmtree, copyfile
|
||||
from shutil import rmtree, copyfile, move
|
||||
import gc
|
||||
import time
|
||||
from textwrap import dedent
|
||||
from hashlib import md5
|
||||
import collections
|
||||
from pyeclib.ec_iface import ECDriverError
|
||||
from tempfile import mkdtemp, NamedTemporaryFile
|
||||
import weakref
|
||||
@ -55,7 +56,7 @@ from swift.common.utils import hash_path, storage_directory, \
|
||||
from test.unit import (
|
||||
connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing,
|
||||
FakeMemcache, debug_logger, patch_policies, write_fake_ring,
|
||||
mocked_http_conn, DEFAULT_TEST_EC_TYPE)
|
||||
mocked_http_conn, DEFAULT_TEST_EC_TYPE, make_timestamp_iter)
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers.obj import ReplicatedObjectController
|
||||
from swift.obj import server as object_server
|
||||
@ -5595,6 +5596,255 @@ class TestECMismatchedFA(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
|
||||
class TestECGets(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
prosrv = _test_servers[0]
|
||||
# don't leak error limits and poison other tests
|
||||
prosrv._error_limiting = {}
|
||||
|
||||
def _setup_nodes_and_do_GET(self, objs, node_state):
|
||||
"""
|
||||
A helper method that creates object fragments, stashes them in temp
|
||||
dirs, and then moves selected fragments back into the hash_dirs on each
|
||||
node according to a specified desired node state description.
|
||||
|
||||
:param objs: a dict that maps object references to dicts that describe
|
||||
the object timestamp and content. Object frags will be
|
||||
created for each item in this dict.
|
||||
:param node_state: a dict that maps a node index to the desired state
|
||||
for that node. Each desired state is a list of
|
||||
dicts, with each dict describing object reference,
|
||||
frag_index and file extensions to be moved to the
|
||||
node's hash_dir.
|
||||
"""
|
||||
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv,
|
||||
obj2srv, obj3srv) = _test_servers
|
||||
ec_policy = POLICIES[3]
|
||||
container_name = uuid.uuid4().hex
|
||||
obj_name = uuid.uuid4().hex
|
||||
obj_path = os.path.join(os.sep, 'v1', 'a', container_name, obj_name)
|
||||
|
||||
# PUT container, make sure it worked
|
||||
container_path = os.path.join(os.sep, 'v1', 'a', container_name)
|
||||
ec_container = Request.blank(
|
||||
container_path, environ={"REQUEST_METHOD": "PUT"},
|
||||
headers={"X-Storage-Policy": "ec", "X-Auth-Token": "t"})
|
||||
resp = ec_container.get_response(prosrv)
|
||||
self.assertIn(resp.status_int, (201, 202))
|
||||
|
||||
partition, nodes = \
|
||||
ec_policy.object_ring.get_nodes('a', container_name, obj_name)
|
||||
|
||||
# map nodes to hash dirs
|
||||
node_hash_dirs = {}
|
||||
node_tmp_dirs = collections.defaultdict(dict)
|
||||
for node in nodes:
|
||||
node_hash_dirs[node['index']] = os.path.join(
|
||||
_testdir, node['device'], storage_directory(
|
||||
diskfile.get_data_dir(ec_policy),
|
||||
partition, hash_path('a', container_name, obj_name)))
|
||||
|
||||
def _put_object(ref, timestamp, body):
|
||||
# PUT an object and then move its disk files to a temp dir
|
||||
headers = {"X-Timestamp": timestamp.internal}
|
||||
put_req1 = Request.blank(obj_path, method='PUT', headers=headers)
|
||||
put_req1.body = body
|
||||
resp = put_req1.get_response(prosrv)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# GET the obj, should work fine
|
||||
get_req = Request.blank(obj_path, method="GET")
|
||||
resp = get_req.get_response(prosrv)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, body)
|
||||
|
||||
# move all hash dir files to per-node, per-obj tempdir
|
||||
for node_index, hash_dir in node_hash_dirs.items():
|
||||
node_tmp_dirs[node_index][ref] = mkdtemp()
|
||||
for f in os.listdir(hash_dir):
|
||||
move(os.path.join(hash_dir, f),
|
||||
os.path.join(node_tmp_dirs[node_index][ref], f))
|
||||
|
||||
for obj_ref, obj_info in objs.items():
|
||||
_put_object(obj_ref, **obj_info)
|
||||
|
||||
# sanity check - all hash_dirs are empty and GET returns a 404
|
||||
for hash_dir in node_hash_dirs.values():
|
||||
self.assertFalse(os.listdir(hash_dir))
|
||||
get_req = Request.blank(obj_path, method="GET")
|
||||
resp = get_req.get_response(prosrv)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
# node state is in form:
|
||||
# {node_index: [{ref: object reference,
|
||||
# frag_index: index,
|
||||
# exts: ['.data' etc]}, ...],
|
||||
# node_index: ...}
|
||||
for node_index, state in node_state.items():
|
||||
dest = node_hash_dirs[node_index]
|
||||
for frag_info in state:
|
||||
src = node_tmp_dirs[frag_info['frag_index']][frag_info['ref']]
|
||||
src_files = [f for f in os.listdir(src)
|
||||
if f.endswith(frag_info['exts'])]
|
||||
self.assertEqual(len(frag_info['exts']), len(src_files),
|
||||
'Bad test setup for node %s, obj %s'
|
||||
% (node_index, frag_info['ref']))
|
||||
for f in src_files:
|
||||
move(os.path.join(src, f), os.path.join(dest, f))
|
||||
|
||||
# do an object GET
|
||||
get_req = Request.blank(obj_path, method='GET')
|
||||
return get_req.get_response(prosrv)
|
||||
|
||||
def test_GET_with_missing_durables(self):
|
||||
# verify object GET behavior when durable files are missing
|
||||
ts_iter = make_timestamp_iter()
|
||||
objs = {'obj1': dict(timestamp=next(ts_iter), body='body')}
|
||||
|
||||
# durable missing from 2/3 nodes
|
||||
node_state = {
|
||||
0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
|
||||
1: [dict(ref='obj1', frag_index=1, exts=('.data',))],
|
||||
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, objs['obj1']['body'])
|
||||
|
||||
# all files missing on 1 node, durable missing from 1/2 other nodes
|
||||
# durable missing from 2/3 nodes
|
||||
node_state = {
|
||||
0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
|
||||
1: [],
|
||||
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, objs['obj1']['body'])
|
||||
|
||||
# durable missing from all 3 nodes
|
||||
node_state = {
|
||||
0: [dict(ref='obj1', frag_index=0, exts=('.data',))],
|
||||
1: [dict(ref='obj1', frag_index=1, exts=('.data',))],
|
||||
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
def test_GET_with_multiple_frags_per_node(self):
|
||||
# verify object GET behavior when multiple fragments are on same node
|
||||
ts_iter = make_timestamp_iter()
|
||||
objs = {'obj1': dict(timestamp=next(ts_iter), body='body')}
|
||||
|
||||
# scenario: only two frags, both on same node
|
||||
node_state = {
|
||||
0: [],
|
||||
1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')),
|
||||
dict(ref='obj1', frag_index=1, exts=('.data',))],
|
||||
2: []
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, objs['obj1']['body'])
|
||||
|
||||
# scenario: all 3 frags on same node
|
||||
node_state = {
|
||||
0: [],
|
||||
1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')),
|
||||
dict(ref='obj1', frag_index=1, exts=('.data',)),
|
||||
dict(ref='obj1', frag_index=2, exts=('.data',))],
|
||||
2: []
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, objs['obj1']['body'])
|
||||
|
||||
def test_GET_with_multiple_timestamps_on_nodes(self):
|
||||
ts_iter = make_timestamp_iter()
|
||||
|
||||
ts_1, ts_2, ts_3 = [next(ts_iter) for _ in range(3)]
|
||||
objs = {'obj1': dict(timestamp=ts_1, body='body1'),
|
||||
'obj2': dict(timestamp=ts_2, body='body2'),
|
||||
'obj3': dict(timestamp=ts_3, body='body3')}
|
||||
|
||||
# newer non-durable frags do not prevent proxy getting the durable obj1
|
||||
node_state = {
|
||||
0: [dict(ref='obj3', frag_index=0, exts=('.data',)),
|
||||
dict(ref='obj2', frag_index=0, exts=('.data',)),
|
||||
dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
|
||||
1: [dict(ref='obj3', frag_index=1, exts=('.data',)),
|
||||
dict(ref='obj2', frag_index=1, exts=('.data',)),
|
||||
dict(ref='obj1', frag_index=1, exts=('.data', '.durable'))],
|
||||
2: [dict(ref='obj3', frag_index=2, exts=('.data',)),
|
||||
dict(ref='obj2', frag_index=2, exts=('.data',)),
|
||||
dict(ref='obj1', frag_index=2, exts=('.data', '.durable'))],
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, objs['obj1']['body'])
|
||||
|
||||
# .durables at two timestamps: in this scenario proxy is guaranteed
|
||||
# to see the durable at ts_2 with one of the first 2 responses, so will
|
||||
# then prefer that when requesting from third obj server
|
||||
node_state = {
|
||||
0: [dict(ref='obj3', frag_index=0, exts=('.data',)),
|
||||
dict(ref='obj2', frag_index=0, exts=('.data',)),
|
||||
dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
|
||||
1: [dict(ref='obj3', frag_index=1, exts=('.data',)),
|
||||
dict(ref='obj2', frag_index=1, exts=('.data', '.durable'))],
|
||||
2: [dict(ref='obj3', frag_index=2, exts=('.data',)),
|
||||
dict(ref='obj2', frag_index=2, exts=('.data', '.durable'))],
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, objs['obj2']['body'])
|
||||
|
||||
def test_GET_with_same_frag_index_on_multiple_nodes(self):
|
||||
ts_iter = make_timestamp_iter()
|
||||
|
||||
# this is a trick to be able to get identical frags placed onto
|
||||
# multiple nodes: since we cannot *copy* frags, we generate three sets
|
||||
# of identical frags at same timestamp so we have enough to *move*
|
||||
ts_1 = next(ts_iter)
|
||||
objs = {'obj1a': dict(timestamp=ts_1, body='body'),
|
||||
'obj1b': dict(timestamp=ts_1, body='body'),
|
||||
'obj1c': dict(timestamp=ts_1, body='body')}
|
||||
|
||||
# arrange for duplicate frag indexes across nodes: because the object
|
||||
# server prefers the highest available frag index, proxy will first get
|
||||
# back two responses with frag index 1, and will then return to node 0
|
||||
# for frag_index 0.
|
||||
node_state = {
|
||||
0: [dict(ref='obj1a', frag_index=0, exts=('.data',)),
|
||||
dict(ref='obj1a', frag_index=1, exts=('.data',))],
|
||||
1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))],
|
||||
2: [dict(ref='obj1c', frag_index=1, exts=('.data', '.durable'))]
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, objs['obj1a']['body'])
|
||||
|
||||
# if all we have across nodes are frags with same index then expect a
|
||||
# 404 (the third, 'extra', obj server GET will return 404 because it
|
||||
# will be sent frag prefs that exclude frag_index 1)
|
||||
node_state = {
|
||||
0: [dict(ref='obj1a', frag_index=1, exts=('.data',))],
|
||||
1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))],
|
||||
2: [dict(ref='obj1c', frag_index=1, exts=('.data',))]
|
||||
}
|
||||
|
||||
resp = self._setup_nodes_and_do_GET(objs, node_state)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
|
||||
class TestObjectDisconnectCleanup(unittest.TestCase):
|
||||
|
||||
# update this if you need to make more different devices in do_setup
|
||||
|
Loading…
Reference in New Issue
Block a user