EC Fragment Duplication - Foundational Global EC Cluster Support

This patch enables efficent PUT/GET for global distributed cluster[1].

Problem:
Erasure coding has the capability to decrease the amout of actual stored
data less then replicated model. For example, ec_k=6, ec_m=3 parameter
can be 1.5x of the original data which is smaller than 3x replicated.
However, unlike replication, erasure coding requires availability of at
least some ec_k fragments of the total ec_k + ec_m fragments to service
read (e.g. 6 of 9 in the case above). As such, if we stored the
EC object into a swift cluster on 2 geographically distributed data
centers which have the same volume of disks, it is likely the fragments
will be stored evenly (about 4 and 5) so we still need to access a
faraway data center to decode the original object. In addition, if one
of the data centers was lost in a disaster, the stored objects will be
lost forever, and we have to cry a lot. To ensure highly durable
storage, you would think of making *more* parity fragments (e.g.
ec_k=6, ec_m=10), unfortunately this causes *significant* performance
degradation due to the cost of mathmetical caluculation for erasure
coding encode/decode.

How this resolves the problem:
EC Fragment Duplication extends on the initial solution to add *more*
fragments from which to rebuild an object similar to the solution
described above. The difference is making *copies* of encoded fragments.
With experimental results[1][2], employing small ec_k and ec_m shows
enough performance to store/retrieve objects.

On PUT:

- Encode incomming object with small ec_k and ec_m  <- faster!
- Make duplicated copies of the encoded fragments. The # of copies
  are determined by 'ec_duplication_factor' in swift.conf
- Store all fragments in Swift Global EC Cluster

The duplicated fragments increase pressure on existing requirements
when decoding objects in service to a read request.  All fragments are
stored with their X-Object-Sysmeta-Ec-Frag-Index.  In this change, the
X-Object-Sysmeta-Ec-Frag-Index represents the actual fragment index
encoded by PyECLib, there *will* be duplicates.  Anytime we must decode
the original object data, we must only consider the ec_k fragments as
unique according to their X-Object-Sysmeta-Ec-Frag-Index.  On decode no
duplicate X-Object-Sysmeta-Ec-Frag-Index may be used when decoding an
object, duplicate X-Object-Sysmeta-Ec-Frag-Index should be expected and
avoided if possible.

On GET:

This patch inclues following changes:
- Change GET Path to sort primary nodes grouping as subsets, so that
  each subset will includes unique fragments
- Change Reconstructor to be more aware of possibly duplicate fragments

For example, with this change, a policy could be configured such that

swift.conf:
ec_num_data_fragments = 2
ec_num_parity_fragments = 1
ec_duplication_factor = 2
(object ring must have 6 replicas)

At Object-Server:
node index (from object ring):  0 1 2 3 4 5 <- keep node index for
                                               reconstruct decision
X-Object-Sysmeta-Ec-Frag-Index: 0 1 2 0 1 2 <- each object keeps actual
                                               fragment index for
                                               backend (PyEClib)

Additional improvements to Global EC Cluster Support will require
features such as Composite Rings, and more efficient fragment
rebalance/reconstruction.

1: http://goo.gl/IYiNPk (Swift Design Spec Repository)
2: http://goo.gl/frgj6w (Slide Share for OpenStack Summit Tokyo)

Doc-Impact

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Change-Id: Idd155401982a2c48110c30b480966a863f6bd305
This commit is contained in:
Kota Tsuyuzaki 2015-08-06 01:06:47 -07:00
parent cdd72dd34f
commit 40ba7f6172
13 changed files with 2994 additions and 1235 deletions

View File

@ -205,6 +205,11 @@ an EC policy can be setup is shown below::
ec_num_parity_fragments = 4
ec_object_segment_size = 1048576
# Duplicated EC fragments is proof-of-concept experimental support to enable
# Global Erasure Coding policies with multiple regions acting as independent
# failure domains. Do not change the default except in development/testing.
ec_duplication_factor = 1
Let's take a closer look at each configuration parameter:
* ``name``: This is a standard storage policy parameter.
@ -223,6 +228,11 @@ Let's take a closer look at each configuration parameter:
comprised of parity.
* ``ec_object_segment_size``: The amount of data that will be buffered up before
feeding a segment into the encoder/decoder. The default value is 1048576.
* ``ec_duplication_factor``: The number of duplicate copies for each fragment.
This is now experimental support to enable Global Erasure Coding policies with
multiple regions. Do not change the default except in development/testing. And
please read the "EC Duplication" section below before changing the default
value.
When PyECLib encodes an object, it will break it into N fragments. However, what
is important during configuration, is how many of those are data and how many
@ -273,6 +283,94 @@ For at least the initial version of EC, it is not recommended that an EC scheme
span beyond a single region, neither performance nor functional validation has
be been done in such a configuration.
EC Duplication
^^^^^^^^^^^^^^
ec_duplication_factor is an option to make duplicate copies of fragments
of erasure encoded Swift objects. The default value is 1 (not duplicate).
If an erasure code storage policy is configured with a non-default
ec_duplication_factor of N > 1, then the policy will create N duplicates of
each unique fragment that is returned from the configured EC engine.
Duplication of EC fragments is optimal for EC storage policies which require
dispersion of fragment data across failure domains. Without duplication, almost
of common ec parameters like 10-4 cause less assignments than 1/(the number
of failure domains) of the total unique fragments. And usually, it will be less
than the number of data fragments which are required to construct the original
data. To guarantee the number of fragments in a failure domain, the system
requires more parities. On the situation which needs more parity, empirical
testing has shown using duplication is more efficient in the PUT path than
encoding a schema with num_parity > num_data, and Swift EC supports this schema.
You should evaluate which strategy works best in your environment.
e.g. 10-4 and duplication factor of 2 will store 28 fragments (i.e.
(``ec_num_data_fragments`` + ``ec_num_parity_fragments``) *
``ec_duplication_factor``). This \*can\* allow for a failure domain to rebuild
an object to full durability even when \*more\* than 14 fragments are
unavailable.
.. note::
Current EC Duplication is a part work of EC region support so we still
have some known issues to get complete region supports:
Known-Issues:
- Unique fragment dispersion
Currently, Swift \*doesn't\* guarantee the dispersion of unique
fragments' locations in the global distributed cluster being robust
in the disaster recovery case. While the goal is to have duplicates
of each unique fragment placed in each region, it is currently
possible for duplicates of the same unique fragment to be placed in
the same region. Since a set of ``ec_num_data_fragments`` unique
fragments is required to reconstruct an object, the suboptimal
distribution of duplicates across regions may, in some cases, make it
impossible to assemble such a set from a single region.
For example, if we have a Swift cluster with 2 regions, the fragments may
be located like as:
::
r1
#0#d.data
#0#d.data
#2#d.data
#2#d.data
#4#d.data
#4#d.data
r2
#1#d.data
#1#d.data
#3#d.data
#3#d.data
#5#d.data
#5#d.data
In this case, r1 has only the fragments with index 0, 2, 4 and r2 has
the rest of indexes but we need 4 unique indexes to decode. To resolve
the case, the composite ring which enables the operator oriented location
mapping [1] is under development.
1: https://review.openstack.org/#/c/271920/
- Efficient node iteration for read
Since EC fragment duplication requires a set of unique fragment indexes
to decode the original object, it needs efficient node iteration rather
than current. Current Swift is iterating the nodes ordered by sorting
method defined in proxy server config. (i.e. either shuffle, node_timing,
or read_affinity) However, the sorted result could include duplicate
indexes for the first primaries to try to connect even if \*we\* know
it obviously needs more nodes to get unique fragments. Hence, current
Swift may call more backend requests than ec_ndata times frequently even
if no node failures in the object-servers.
The possible solution could be some refactoring work on NodeIter to
provide suitable nodes even if it's fragment duplication but it's still
under development yet.
--------------
Under the Hood
--------------

View File

@ -84,7 +84,11 @@ aliases = yellow, orange
#ec_num_data_fragments = 10
#ec_num_parity_fragments = 4
#ec_object_segment_size = 1048576
#
# Duplicated EC fragments is proof-of-concept experimental support to enable
# Global Erasure Coding policies with multiple regions acting as independent
# failure domains. Do not change the default except in development/testing.
#ec_duplication_factor = 1
# The swift-constraints section sets the basic constraints on data
# saved in the swift cluster. These constraints are automatically

View File

@ -20,7 +20,8 @@ import textwrap
import six
from six.moves.configparser import ConfigParser
from swift.common.utils import (
config_true_value, SWIFT_CONF_FILE, whataremyips, list_from_csv)
config_true_value, SWIFT_CONF_FILE, whataremyips, list_from_csv,
config_positive_int_value)
from swift.common.ring import Ring, RingData
from swift.common.utils import quorum_size
from swift.common.exceptions import RingLoadError
@ -406,7 +407,8 @@ class ECStoragePolicy(BaseStoragePolicy):
def __init__(self, idx, name='', aliases='', is_default=False,
is_deprecated=False, object_ring=None,
ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE,
ec_type=None, ec_ndata=None, ec_nparity=None):
ec_type=None, ec_ndata=None, ec_nparity=None,
ec_duplication_factor=1):
super(ECStoragePolicy, self).__init__(
idx=idx, name=name, aliases=aliases, is_default=is_default,
@ -489,6 +491,9 @@ class ECStoragePolicy(BaseStoragePolicy):
self._ec_ndata + self.pyeclib_driver.min_parity_fragments_needed()
self._fragment_size = None
self._ec_duplication_factor = \
config_positive_int_value(ec_duplication_factor)
@property
def ec_type(self):
return self._ec_type
@ -501,6 +506,10 @@ class ECStoragePolicy(BaseStoragePolicy):
def ec_nparity(self):
return self._ec_nparity
@property
def ec_n_unique_fragments(self):
return self._ec_ndata + self._ec_nparity
@property
def ec_segment_size(self):
return self._ec_segment_size
@ -538,11 +547,20 @@ class ECStoragePolicy(BaseStoragePolicy):
"""
return "%s %d+%d" % (self._ec_type, self._ec_ndata, self._ec_nparity)
@property
def ec_duplication_factor(self):
return self._ec_duplication_factor
def __repr__(self):
extra_info = ''
if self.ec_duplication_factor != 1:
extra_info = ', ec_duplication_factor=%d' % \
self.ec_duplication_factor
return ("%s, EC config(ec_type=%s, ec_segment_size=%d, "
"ec_ndata=%d, ec_nparity=%d)") % \
"ec_ndata=%d, ec_nparity=%d%s)") % \
(super(ECStoragePolicy, self).__repr__(), self.ec_type,
self.ec_segment_size, self.ec_ndata, self.ec_nparity)
self.ec_segment_size, self.ec_ndata, self.ec_nparity,
extra_info)
@classmethod
def _config_options_map(cls):
@ -552,6 +570,7 @@ class ECStoragePolicy(BaseStoragePolicy):
'ec_object_segment_size': 'ec_segment_size',
'ec_num_data_fragments': 'ec_ndata',
'ec_num_parity_fragments': 'ec_nparity',
'ec_duplication_factor': 'ec_duplication_factor',
})
return options
@ -562,13 +581,14 @@ class ECStoragePolicy(BaseStoragePolicy):
info.pop('ec_num_data_fragments')
info.pop('ec_num_parity_fragments')
info.pop('ec_type')
info.pop('ec_duplication_factor')
return info
@property
def quorum(self):
"""
Number of successful backend requests needed for the proxy to consider
the client request successful.
the client PUT request successful.
The quorum size for EC policies defines the minimum number
of data + parity elements required to be able to guarantee
@ -584,7 +604,7 @@ class ECStoragePolicy(BaseStoragePolicy):
for every erasure coding scheme, consult PyECLib for
min_parity_fragments_needed()
"""
return self._ec_quorum_size
return self._ec_quorum_size * self.ec_duplication_factor
def load_ring(self, swift_dir):
"""
@ -605,18 +625,35 @@ class ECStoragePolicy(BaseStoragePolicy):
considering the number of nodes in the primary list from the ring.
"""
nodes_configured = len(ring_data._replica2part2dev_id)
if nodes_configured != (self.ec_ndata + self.ec_nparity):
configured_fragment_count = len(ring_data._replica2part2dev_id)
required_fragment_count = \
(self.ec_n_unique_fragments) * self.ec_duplication_factor
if configured_fragment_count != required_fragment_count:
raise RingLoadError(
'EC ring for policy %s needs to be configured with '
'exactly %d replicas. Got %d.' % (
self.name, self.ec_ndata + self.ec_nparity,
nodes_configured))
self.name, required_fragment_count,
configured_fragment_count))
self.object_ring = Ring(
swift_dir, ring_name=self.ring_name,
validation_hook=validate_ring_data)
def get_backend_index(self, node_index):
"""
Backend index for PyECLib
:param node_index: integer of node index
:return: integer of actual fragment index. if param is not an integer,
return None instead
"""
try:
node_index = int(node_index)
except ValueError:
return None
return node_index % self.ec_n_unique_fragments
class StoragePolicyCollection(object):
"""

View File

@ -345,6 +345,21 @@ def config_true_value(value):
(isinstance(value, six.string_types) and value.lower() in TRUE_VALUES)
def config_positive_int_value(value):
"""
Returns positive int value if it can be cast by int() and it's an
integer > 0. (not including zero) Raises ValueError otherwise.
"""
try:
value = int(value)
if value < 1:
raise ValueError()
except (TypeError, ValueError):
raise ValueError(
'Config option must be an positive int number, not "%s".' % value)
return value
def config_auto_int_value(value, default):
"""
Returns default if value is None or 'auto'.

View File

@ -193,6 +193,8 @@ class ObjectReconstructor(Daemon):
return True
def _full_path(self, node, part, path, policy):
frag_index = (policy.get_backend_index(node['index'])
if 'index' in node else 'handoff')
return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d frag#%(frag_index)s' % {
@ -201,7 +203,7 @@ class ObjectReconstructor(Daemon):
'device': node['device'],
'part': part, 'path': path,
'policy': policy,
'frag_index': node.get('index', 'handoff'),
'frag_index': frag_index,
}
def _get_response(self, node, part, path, headers, policy):
@ -217,6 +219,7 @@ class ObjectReconstructor(Daemon):
:class:`~swift.common.storage_policy.BaseStoragePolicy`
:returns: response
"""
full_path = self._full_path(node, part, path, policy)
resp = None
try:
with ConnectionTimeout(self.conn_timeout):
@ -224,18 +227,18 @@ class ObjectReconstructor(Daemon):
part, 'GET', path, headers=headers)
with Timeout(self.node_timeout):
resp = conn.getresponse()
resp.full_path = full_path
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status,
'full_path': self._full_path(node, part, path, policy)})
{'resp': resp.status, 'full_path': full_path})
resp = None
elif resp.status == HTTP_NOT_FOUND:
resp = None
except (Exception, Timeout):
self.logger.exception(
_("Trying to GET %(full_path)s"), {
'full_path': self._full_path(node, part, path, policy)})
'full_path': full_path})
return resp
def reconstruct_fa(self, job, node, datafile_metadata):
@ -259,7 +262,7 @@ class ObjectReconstructor(Daemon):
# the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list
fi_to_rebuild = node['index']
fi_to_rebuild = job['policy'].get_backend_index(node['index'])
# KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want
@ -272,40 +275,96 @@ class ObjectReconstructor(Daemon):
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
pile = GreenAsyncPile(len(part_nodes))
path = datafile_metadata['name']
for node in part_nodes:
pile.spawn(self._get_response, node, job['partition'],
for _node in part_nodes:
pile.spawn(self._get_response, _node, job['partition'],
path, headers, job['policy'])
responses = []
etag = None
buckets = defaultdict(dict)
etag_buckets = {}
error_resp_count = 0
for resp in pile:
if not resp:
error_resp_count += 1
continue
resp.headers = HeaderKeyDict(resp.getheaders())
if str(fi_to_rebuild) == \
resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index'):
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
try:
unique_index = int(frag_index)
except (TypeError, ValueError):
# The successful response should include valid X-Object-
# Sysmeta-Ec-Frag-Index but for safety, catching the case
# either missing X-Object-Sysmeta-Ec-Frag-Index or invalid
# frag index to reconstruct and dump warning log for that
self.logger.warning(
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index)',
resp.full_path)
continue
if resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') in set(
r.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
for r in responses):
continue
responses.append(resp)
etag = sorted(responses, reverse=True,
key=lambda r: Timestamp(
r.headers.get('X-Backend-Timestamp')
))[0].headers.get('X-Object-Sysmeta-Ec-Etag')
responses = [r for r in responses if
r.headers.get('X-Object-Sysmeta-Ec-Etag') == etag]
if len(responses) >= job['policy'].ec_ndata:
break
else:
self.logger.error(
'Unable to get enough responses (%s/%s) '
'to reconstruct %s with ETag %s' % (
len(responses), job['policy'].ec_ndata,
if fi_to_rebuild == unique_index:
# TODO: With duplicated EC frags it's not unreasonable to find
# the very fragment we're trying to rebuild exists on another
# primary node. In this case we should stream it directly from
# the remote node to our target instead of rebuild. But
# instead we ignore it.
self.logger.debug(
'Found existing frag #%s while rebuilding #%s from %s',
unique_index, fi_to_rebuild, self._full_path(
node, job['partition'], datafile_metadata['name'],
job['policy']))
continue
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning(
'Invalid resp from %s (missing X-Backend-Timestamp)',
resp.full_path)
continue
timestamp = Timestamp(timestamp)
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning('Invalid resp from %s (missing Etag)',
resp.full_path)
continue
if etag != etag_buckets.setdefault(timestamp, etag):
self.logger.error(
'Mixed Etag (%s, %s) for %s',
etag, etag_buckets[timestamp],
self._full_path(node, job['partition'],
datafile_metadata['name'], job['policy']),
etag))
datafile_metadata['name'], job['policy']))
continue
if unique_index not in buckets[timestamp]:
buckets[timestamp][unique_index] = resp
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
responses = buckets[timestamp].values()
self.logger.debug(
'Reconstruct frag #%s with frag indexes %s'
% (fi_to_rebuild, list(buckets[timestamp])))
break
else:
for timestamp, resp in sorted(buckets.items()):
etag = etag_buckets[timestamp]
self.logger.error(
'Unable to get enough responses (%s/%s) '
'to reconstruct %s with ETag %s' % (
len(resp), job['policy'].ec_ndata,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy']),
etag))
if error_resp_count:
self.logger.error(
'Unable to get enough responses (%s error responses) '
'to reconstruct %s' % (
error_resp_count,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy'])))
raise DiskFileError('Unable to reconstruct EC archive')
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
@ -696,7 +755,7 @@ class ObjectReconstructor(Daemon):
A partition may result in multiple jobs. Potentially many
REVERT jobs, and zero or one SYNC job.
:param local_dev: the local device
:param local_dev: the local device (node dict)
:param part_path: full path to partition
:param partition: partition number
:param policy: the policy
@ -756,7 +815,7 @@ class ObjectReconstructor(Daemon):
for node in part_nodes:
if node['id'] == local_dev['id']:
# this partition belongs here, we'll need a sync job
frag_index = node['index']
frag_index = policy.get_backend_index(node['index'])
try:
suffixes = data_fi_to_suffixes.pop(frag_index)
except KeyError:
@ -765,7 +824,7 @@ class ObjectReconstructor(Daemon):
job_type=SYNC,
frag_index=frag_index,
suffixes=suffixes,
sync_to=_get_partners(frag_index, part_nodes),
sync_to=_get_partners(node['index'], part_nodes),
)
# ssync callback to rebuild missing fragment_archives
sync_job['sync_diskfile_builder'] = self.reconstruct_fa
@ -776,11 +835,21 @@ class ObjectReconstructor(Daemon):
ordered_fis = sorted((len(suffixes), fi) for fi, suffixes
in data_fi_to_suffixes.items())
for count, fi in ordered_fis:
# In single region EC a revert job must sync to the specific
# primary who's node_index matches the data's frag_index. With
# duplicated EC frags a revert job must sync to all primary nodes
# that should be holding this frag_index.
nodes_sync_to = []
node_index = fi
for n in range(policy.ec_duplication_factor):
nodes_sync_to.append(part_nodes[node_index])
node_index += policy.ec_n_unique_fragments
revert_job = build_job(
job_type=REVERT,
frag_index=fi,
suffixes=data_fi_to_suffixes[fi],
sync_to=[part_nodes[fi]],
sync_to=nodes_sync_to,
)
jobs.append(revert_job)

View File

@ -1775,8 +1775,9 @@ def chunk_transformer(policy, nstreams):
frags_by_byte_order = []
for chunk_to_encode in chunks_to_encode:
frags_by_byte_order.append(
policy.pyeclib_driver.encode(chunk_to_encode))
encoded_chunks = policy.pyeclib_driver.encode(chunk_to_encode)
send_chunks = encoded_chunks * policy.ec_duplication_factor
frags_by_byte_order.append(send_chunks)
# Sequential calls to encode() have given us a list that
# looks like this:
#
@ -1801,7 +1802,7 @@ def chunk_transformer(policy, nstreams):
last_bytes = ''.join(buf)
if last_bytes:
last_frags = policy.pyeclib_driver.encode(last_bytes)
yield last_frags
yield last_frags * policy.ec_duplication_factor
else:
yield [''] * nstreams
@ -2178,6 +2179,7 @@ class ECObjectController(BaseObjectController):
range_specs = self._convert_range(req, policy)
safe_iter = GreenthreadSafeIterator(node_iter)
# Sending the request concurrently to all nodes, and responding
# with the first response isn't something useful for EC as all
# nodes contain different fragments. Also EC has implemented it's
@ -2204,8 +2206,11 @@ class ECObjectController(BaseObjectController):
# getters in case some unforeseen scenario, or a misbehaving object
# server, causes us to otherwise make endless requests e.g. if an
# object server were to ignore frag_prefs and always respond with
# a frag that is already in a bucket.
max_extra_requests = 2 * policy.ec_nparity + policy.ec_ndata
# a frag that is already in a bucket. Now we're assuming it should
# be limit at most 2 * replicas.
max_extra_requests = (
(policy.object_ring.replica_count * 2) - policy.ec_ndata)
for get, parts_iter in pile:
if get.last_status is None:
# We may have spawned getters that find the node iterator
@ -2322,7 +2327,7 @@ class ECObjectController(BaseObjectController):
logger=self.app.logger,
need_multiphase=True)
def _determine_chunk_destinations(self, putters):
def _determine_chunk_destinations(self, putters, policy):
"""
Given a list of putters, return a dict where the key is the putter
and the value is the node index to use.
@ -2331,6 +2336,10 @@ class ECObjectController(BaseObjectController):
(in the primary part list) as the primary that the handoff is standing
in for. This lets erasure-code fragment archives wind up on the
preferred local primary nodes when possible.
:param putters: a list of swift.proxy.controllers.obj.MIMEPutter
instance
:param policy: A policy instance
"""
# Give each putter a "chunk index": the index of the
# transformed chunk that we'll send to it.
@ -2351,9 +2360,34 @@ class ECObjectController(BaseObjectController):
# nodes. Holes occur when a storage node is down, in which
# case the connection is not replaced, and when a storage node
# returns 507, in which case a handoff is used to replace it.
holes = [x for x in range(len(putters))
if x not in chunk_index.values()]
# lack_list is a dict of list to keep hole indexes
# e.g. if we have 2 holes for index 0 with ec_duplication_factor=2
# lack_list is like {0: [0], 1: [0]}, and then, if 1 hole found
# for index 1, lack_list will be {0: [0, 1], 1: [0]}.
# After that, holes will be filled from bigger key
# (i.e. 1:[0] at first)
# Grouping all missing fragment indexes for each unique_index
unique_index_to_holes = collections.defaultdict(list)
available_indexes = chunk_index.values()
for node_index in range(policy.object_ring.replica_count):
if node_index not in available_indexes:
unique_index = policy.get_backend_index(node_index)
unique_index_to_holes[unique_index].append(node_index)
# Set the missing index to lack_list
lack_list = collections.defaultdict(list)
for unique_index, holes in unique_index_to_holes.items():
for lack_tier, hole_node_index in enumerate(holes):
lack_list[lack_tier].append(hole_node_index)
# Extract the lack_list to a flat list
holes = []
for lack_tier, indexes in sorted(lack_list.items(), reverse=True):
holes.extend(indexes)
# Fill chunk_index list with the hole list
for hole, p in zip(holes, handoff_conns):
chunk_index[p] = hole
return chunk_index
@ -2405,7 +2439,8 @@ class ECObjectController(BaseObjectController):
# build our chunk index dict to place handoffs in the
# same part nodes index as the primaries they are covering
chunk_index = self._determine_chunk_destinations(putters)
chunk_index = self._determine_chunk_destinations(
putters, policy)
for putter in putters:
putter.spawn_sender_greenthread(
@ -2456,7 +2491,7 @@ class ECObjectController(BaseObjectController):
# Update any footers set by middleware with EC footers
trail_md = trailing_metadata(
policy, etag_hasher,
bytes_transferred, ci)
bytes_transferred, policy.get_backend_index(ci))
trail_md.update(footers)
# Etag footer must always be hash of what we sent
trail_md['Etag'] = chunk_hashers[ci].hexdigest()

View File

@ -21,6 +21,7 @@ import copy
import logging
import errno
from six.moves import range
from six import BytesIO
import sys
from contextlib import contextmanager, closing
from collections import defaultdict, Iterable
@ -34,13 +35,14 @@ from tempfile import mkdtemp
from shutil import rmtree
import signal
import json
import random
from swift.common.utils import Timestamp, NOTICE
from test import get_config
from swift.common import utils
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.ring import Ring, RingData
from swift.obj import server
from hashlib import md5
import logging.handlers
@ -48,6 +50,7 @@ from six.moves.http_client import HTTPException
from swift.common import storage_policy
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
VALID_EC_TYPES)
from swift.common import swob
import functools
import six.moves.cPickle as pickle
from gzip import GzipFile
@ -213,6 +216,7 @@ class FakeRing(Ring):
self._base_port = base_port
self.max_more_nodes = max_more_nodes
self._part_shift = 32 - part_power
self._init_device_char()
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
# this is set higher, or R^2 for R replicas
self.set_replicas(replicas)
@ -221,9 +225,18 @@ class FakeRing(Ring):
def _reload(self):
self._rtime = time.time()
@property
def device_char(self):
return next(self._device_char_iter)
def _init_device_char(self):
self._device_char_iter = itertools.cycle(
['sd%s' % chr(ord('a') + x) for x in range(26)])
def set_replicas(self, replicas):
self.replicas = replicas
self._devs = []
self._init_device_char()
for x in range(self.replicas):
ip = '10.0.0.%s' % x
port = self._base_port + x
@ -233,7 +246,7 @@ class FakeRing(Ring):
'replication_ip': ip,
'port': port,
'replication_port': port,
'device': 'sd' + (chr(ord('a') + x)),
'device': self.device_char,
'zone': x % 3,
'region': x % 2,
'id': x,
@ -290,7 +303,7 @@ class FabricatedRing(Ring):
self.devices = devices
self.nodes = nodes
self.port = port
self.replicas = 6
self.replicas = replicas
self.part_power = part_power
self._part_shift = 32 - self.part_power
self._reload()
@ -1092,6 +1105,30 @@ def requires_o_tmpfile_support(func):
return wrapper
class StubResponse(object):
def __init__(self, status, body='', headers=None, frag_index=None):
self.status = status
self.body = body
self.readable = BytesIO(body)
self.headers = HeaderKeyDict(headers)
if frag_index is not None:
self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
fake_reason = ('Fake', 'This response is a lie.')
self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0]
def getheader(self, header_name, default=None):
return self.headers.get(header_name, default)
def getheaders(self):
if 'Content-Length' not in self.headers:
self.headers['Content-Length'] = len(self.body)
return self.headers.items()
def read(self, amt=0):
return self.readable.read(amt)
def encode_frag_archive_bodies(policy, body):
"""
Given a stub body produce a list of complete frag_archive bodies as
@ -1119,3 +1156,128 @@ def encode_frag_archive_bodies(policy, body):
ec_archive_bodies = [''.join(frags)
for frags in zip(*fragment_payloads)]
return ec_archive_bodies
def make_ec_object_stub(test_body, policy, timestamp):
segment_size = policy.ec_segment_size
test_body = test_body or (
'test' * segment_size)[:-random.randint(1, 1000)]
timestamp = timestamp or utils.Timestamp(time.time())
etag = md5(test_body).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(policy, test_body)
return {
'body': test_body,
'etag': etag,
'frags': ec_archive_bodies,
'timestamp': timestamp
}
def fake_ec_node_response(node_frags, policy):
"""
Given a list of entries for each node in ring order, where the entries
are a dict (or list of dicts) which describes the fragment (or
fragments) that are on the node; create a function suitable for use
with capture_http_requests that will accept a req object and return a
response that will suitably fake the behavior of an object server who
had the given fragments on disk at the time.
:param node_frags: a list. Each item in the list describes the
fragments that are on a node; each item is a dict or list of dicts,
each dict describing a single fragment; where the item is a list,
repeated calls to get_response will return fragments in the order
of the list; each dict has keys:
- obj: an object stub, as generated by _make_ec_object_stub,
that defines all of the fragments that compose an object
at a specific timestamp.
- frag: the index of a fragment to be selected from the object
stub
- durable (optional): True if the selected fragment is durable
:param policy: storage policy to return
"""
node_map = {} # maps node ip and port to node index
all_nodes = []
call_count = {} # maps node index to get_response call count for node
def _build_node_map(req, policy):
node_key = lambda n: (n['ip'], n['port'])
part = utils.split_path(req['path'], 5, 5, True)[1]
all_nodes.extend(policy.object_ring.get_part_nodes(part))
all_nodes.extend(policy.object_ring.get_more_nodes(part))
for i, node in enumerate(all_nodes):
node_map[node_key(node)] = i
call_count[i] = 0
# normalize node_frags to a list of fragments for each node even
# if there's only one fragment in the dataset provided.
for i, frags in enumerate(node_frags):
if isinstance(frags, dict):
node_frags[i] = [frags]
def get_response(req):
requested_policy = int(
req['headers']['X-Backend-Storage-Policy-Index'])
if int(policy) != requested_policy:
AssertionError(
"Requested polciy doesn't fit the fake response policy")
if not node_map:
_build_node_map(req, policy)
try:
node_index = node_map[(req['ip'], req['port'])]
except KeyError:
raise Exception("Couldn't find node %s:%s in %r" % (
req['ip'], req['port'], all_nodes))
try:
frags = node_frags[node_index]
except IndexError:
raise Exception('Found node %r:%r at index %s - '
'but only got %s stub response nodes' % (
req['ip'], req['port'], node_index,
len(node_frags)))
if not frags:
return StubResponse(404)
# determine response fragment (if any) for this call
resp_frag = frags[call_count[node_index]]
call_count[node_index] += 1
frag_prefs = req['headers'].get('X-Backend-Fragment-Preferences')
if not (frag_prefs or resp_frag.get('durable', True)):
return StubResponse(404)
# prepare durable timestamp and backend frags header for this node
obj_stub = resp_frag['obj']
ts2frags = defaultdict(list)
durable_timestamp = None
for frag in frags:
ts_frag = frag['obj']['timestamp']
if frag.get('durable', True):
durable_timestamp = ts_frag.internal
ts2frags[ts_frag].append(frag['frag'])
try:
body = obj_stub['frags'][resp_frag['frag']]
except IndexError as err:
raise Exception(
'Frag index %s not defined: node index %s, frags %r\n%s' %
(resp_frag['frag'], node_index, [f['frag'] for f in frags],
err))
headers = {
'X-Object-Sysmeta-Ec-Content-Length': len(obj_stub['body']),
'X-Object-Sysmeta-Ec-Etag': obj_stub['etag'],
'X-Object-Sysmeta-Ec-Frag-Index':
policy.get_backend_index(resp_frag['frag']),
'X-Backend-Timestamp': obj_stub['timestamp'].internal,
'X-Timestamp': obj_stub['timestamp'].normal,
'X-Backend-Data-Timestamp': obj_stub['timestamp'].internal,
'X-Backend-Fragments':
server._make_backend_fragments_header(ts2frags)
}
if durable_timestamp:
headers['X-Backend-Durable-Timestamp'] = durable_timestamp
return StubResponse(200, body, headers)
return get_response

View File

@ -19,6 +19,7 @@ import unittest
import os
import mock
from functools import partial
from six.moves.configparser import ConfigParser
from tempfile import NamedTemporaryFile
from test.unit import patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE
@ -169,7 +170,11 @@ class TestStoragePolicies(unittest.TestCase):
StoragePolicy(2, 'cee', False),
ECStoragePolicy(10, 'ten',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3)]
ec_ndata=10, ec_nparity=3),
ECStoragePolicy(11, 'eleven',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3,
ec_duplication_factor=2)]
policies = StoragePolicyCollection(test_policies)
for policy in policies:
policy_repr = repr(policy)
@ -185,6 +190,10 @@ class TestStoragePolicies(unittest.TestCase):
policy.ec_nparity in policy_repr)
self.assertTrue('ec_segment_size=%s' %
policy.ec_segment_size in policy_repr)
if policy.ec_duplication_factor > 1:
self.assertTrue('ec_duplication_factor=%s' %
policy.ec_duplication_factor in
policy_repr)
collection_repr = repr(policies)
collection_repr_lines = collection_repr.splitlines()
self.assertTrue(
@ -443,16 +452,21 @@ class TestStoragePolicies(unittest.TestCase):
ECStoragePolicy(0, 'ec8-2', aliases='zeus, jupiter',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=8, ec_nparity=2,
object_ring=FakeRing(replicas=8),
object_ring=FakeRing(replicas=10),
is_default=True),
ECStoragePolicy(1, 'ec10-4', aliases='athena, minerva',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
object_ring=FakeRing(replicas=10)),
object_ring=FakeRing(replicas=14)),
ECStoragePolicy(2, 'ec4-2', aliases='poseidon, neptune',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2,
object_ring=FakeRing(replicas=7)),
object_ring=FakeRing(replicas=6)),
ECStoragePolicy(3, 'ec4-2-dup', aliases='uzuki, rin',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2,
ec_duplication_factor=2,
object_ring=FakeRing(replicas=12)),
]
ec_policies = StoragePolicyCollection(good_test_policies_EC)
@ -460,6 +474,10 @@ class TestStoragePolicies(unittest.TestCase):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[0])
for name in ('ec10-4', 'athena', 'minerva'):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[1])
for name in ('ec4-2', 'poseidon', 'neptune'):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[2])
for name in ('ec4-2-dup', 'uzuki', 'rin'):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[3])
# Testing parsing of conf files/text
good_ec_conf = self._conf("""
@ -478,6 +496,14 @@ class TestStoragePolicies(unittest.TestCase):
ec_type = %(ec_type)s
ec_num_data_fragments = 10
ec_num_parity_fragments = 4
[storage-policy:2]
name = ec4-2-dup
aliases = uzuki, rin
policy_type = erasure_coding
ec_type = %(ec_type)s
ec_num_data_fragments = 4
ec_num_parity_fragments = 2
ec_duplication_factor = 2
""" % {'ec_type': DEFAULT_TEST_EC_TYPE})
ec_policies = parse_storage_policies(good_ec_conf)
@ -485,6 +511,8 @@ class TestStoragePolicies(unittest.TestCase):
ec_policies[0])
self.assertEqual(ec_policies.get_by_name('ec10-4'),
ec_policies.get_by_name('poseidon'))
self.assertEqual(ec_policies.get_by_name('ec4-2-dup'),
ec_policies.get_by_name('uzuki'))
name_repeat_ec_conf = self._conf("""
[storage-policy:0]
@ -1243,11 +1271,16 @@ class TestStoragePolicies(unittest.TestCase):
ec_ndata=8, ec_nparity=2),
ECStoragePolicy(11, 'df10-6', ec_type='flat_xor_hd_4',
ec_ndata=10, ec_nparity=6),
ECStoragePolicy(12, 'ec4-2-dup', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2, ec_duplication_factor=2),
]
for ec_policy in test_ec_policies:
k = ec_policy.ec_ndata
expected_size = \
k + ec_policy.pyeclib_driver.min_parity_fragments_needed()
expected_size = (
(k + ec_policy.pyeclib_driver.min_parity_fragments_needed())
* ec_policy.ec_duplication_factor
)
self.assertEqual(expected_size, ec_policy.quorum)
def test_validate_ring(self):
@ -1259,25 +1292,27 @@ class TestStoragePolicies(unittest.TestCase):
ec_ndata=10, ec_nparity=4),
ECStoragePolicy(2, 'ec4-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2),
ECStoragePolicy(3, 'ec4-2-2dup', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2,
ec_duplication_factor=2)
]
actual_load_ring_replicas = [8, 10, 7]
actual_load_ring_replicas = [8, 10, 7, 11]
policies = StoragePolicyCollection(test_policies)
def create_mock_ring_data(num_replica):
class mock_ring_data_klass(object):
def __init__(self):
self._replica2part2dev_id = [0] * num_replica
return mock_ring_data_klass()
class MockRingData(object):
def __init__(self, num_replica):
self._replica2part2dev_id = [0] * num_replica
for policy, ring_replicas in zip(policies, actual_load_ring_replicas):
with mock.patch('swift.common.ring.ring.RingData.load',
return_value=create_mock_ring_data(ring_replicas)):
return_value=MockRingData(ring_replicas)):
necessary_replica_num = \
policy.ec_n_unique_fragments * policy.ec_duplication_factor
with mock.patch(
'swift.common.ring.ring.validate_configuration'):
msg = 'EC ring for policy %s needs to be configured with ' \
'exactly %d replicas.' % \
(policy.name, policy.ec_ndata + policy.ec_nparity)
(policy.name, necessary_replica_num)
self.assertRaisesWithMessage(RingLoadError, msg,
policy.load_ring, 'mock')
@ -1332,6 +1367,7 @@ class TestStoragePolicies(unittest.TestCase):
'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
'ec_duplication_factor': 1,
},
(10, False): {
'name': 'ten',
@ -1348,12 +1384,30 @@ class TestStoragePolicies(unittest.TestCase):
'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
'ec_duplication_factor': 1,
},
(11, False): {
'name': 'done',
'aliases': 'done',
'deprecated': True,
},
# enabled ec with ec_duplication
(12, True): {
'name': 'twelve',
'aliases': 'twelve',
'default': False,
'deprecated': False,
'policy_type': EC_POLICY,
'ec_type': DEFAULT_TEST_EC_TYPE,
'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
'ec_duplication_factor': 2,
},
(12, False): {
'name': 'twelve',
'aliases': 'twelve',
},
}
self.maxDiff = None
for policy in policies:

View File

@ -2577,6 +2577,33 @@ cluster_dfw1 = http://dfw1.host/v1/
finally:
utils.TRUE_VALUES = orig_trues
def test_config_positive_int_value(self):
expectations = {
# value : expected,
'1': 1,
1: 1,
'2': 2,
'1024': 1024,
'0x01': ValueError,
'asdf': ValueError,
None: ValueError,
0: ValueError,
-1: ValueError,
'1.2': ValueError, # string expresses float should be value error
}
for value, expected in expectations.items():
try:
rv = utils.config_positive_int_value(value)
except Exception as e:
if e.__class__ is not expected:
raise
else:
self.assertEqual(
'Config option must be an positive int number, '
'not "%s".' % value, e.message)
else:
self.assertEqual(expected, rv)
def test_config_auto_int_value(self):
expectations = {
# (value, default) : expected,

View File

@ -84,7 +84,8 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
mkdirs(_testdir)
rmtree(_testdir)
for drive in ('sda1', 'sdb1', 'sdc1', 'sdd1', 'sde1',
'sdf1', 'sdg1', 'sdh1', 'sdi1'):
'sdf1', 'sdg1', 'sdh1', 'sdi1', 'sdj1',
'sdk1', 'sdl1'):
mkdirs(os.path.join(_testdir, drive, 'tmp'))
conf = {'devices': _testdir, 'swift_dir': _testdir,
'mount_check': 'false', 'allowed_headers':
@ -100,9 +101,13 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
obj1lis = listen(('localhost', 0))
obj2lis = listen(('localhost', 0))
obj3lis = listen(('localhost', 0))
objsocks = [obj1lis, obj2lis, obj3lis]
obj4lis = listen(('localhost', 0))
obj5lis = listen(('localhost', 0))
obj6lis = listen(('localhost', 0))
objsocks = [obj1lis, obj2lis, obj3lis, obj4lis, obj5lis, obj6lis]
context["test_sockets"] = \
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis)
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis,
obj4lis, obj5lis, obj6lis)
account_ring_path = os.path.join(_testdir, 'account.ring.gz')
account_devs = [
{'port': acc1lis.getsockname()[1]},
@ -120,7 +125,10 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
StoragePolicy(1, 'one', False),
StoragePolicy(2, 'two', False),
ECStoragePolicy(3, 'ec', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096)])
ec_ndata=2, ec_nparity=1, ec_segment_size=4096),
ECStoragePolicy(4, 'ec-dup', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096,
ec_duplication_factor=2)])
obj_rings = {
0: ('sda1', 'sdb1'),
1: ('sdc1', 'sdd1'),
@ -136,22 +144,41 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
write_fake_ring(obj_ring_path, *obj_devs)
# write_fake_ring can't handle a 3-element ring, and the EC policy needs
# at least 3 devs to work with, so we do it manually
# at least 6 devs to work with (ec_k=2, ec_m=1, duplication_fuctor=2),
# so we do it manually
devs = [{'id': 0, 'zone': 0, 'device': 'sdg1', 'ip': '127.0.0.1',
'port': obj1lis.getsockname()[1]},
{'id': 1, 'zone': 0, 'device': 'sdh1', 'ip': '127.0.0.1',
'port': obj2lis.getsockname()[1]},
{'id': 2, 'zone': 0, 'device': 'sdi1', 'ip': '127.0.0.1',
'port': obj3lis.getsockname()[1]}]
'port': obj3lis.getsockname()[1]},
{'id': 3, 'zone': 0, 'device': 'sdj1', 'ip': '127.0.0.1',
'port': obj4lis.getsockname()[1]},
{'id': 4, 'zone': 0, 'device': 'sdk1', 'ip': '127.0.0.1',
'port': obj5lis.getsockname()[1]},
{'id': 5, 'zone': 0, 'device': 'sdl1', 'ip': '127.0.0.1',
'port': obj6lis.getsockname()[1]}]
pol3_replica2part2dev_id = [[0, 1, 2, 0],
[1, 2, 0, 1],
[2, 0, 1, 2]]
pol4_replica2part2dev_id = [[0, 1, 2, 3],
[1, 2, 3, 4],
[2, 3, 4, 5],
[3, 4, 5, 0],
[4, 5, 0, 1],
[5, 0, 1, 2]]
obj3_ring_path = os.path.join(
_testdir, storage_policy.POLICIES[3].ring_name + '.ring.gz')
part_shift = 30
with closing(GzipFile(obj3_ring_path, 'wb')) as fh:
pickle.dump(RingData(pol3_replica2part2dev_id, devs, part_shift), fh)
obj4_ring_path = os.path.join(
_testdir, storage_policy.POLICIES[4].ring_name + '.ring.gz')
part_shift = 30
with closing(GzipFile(obj4_ring_path, 'wb')) as fh:
pickle.dump(RingData(pol4_replica2part2dev_id, devs, part_shift), fh)
prosrv = proxy_server.Application(conf, logger=debug_logger('proxy'))
for policy in storage_policy.POLICIES:
# make sure all the rings are loaded
@ -172,8 +199,15 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
conf, logger=debug_logger('obj2'))
obj3srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj3'))
obj4srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj4'))
obj5srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj5'))
obj6srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj6'))
context["test_servers"] = \
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv, obj3srv)
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv, obj3srv,
obj4srv, obj5srv, obj6srv)
nl = NullLogger()
logging_prosv = proxy_logging.ProxyLoggingMiddleware(prosrv, conf,
logger=prosrv.logger)
@ -185,8 +219,12 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
obj1spa = spawn(wsgi.server, obj1lis, obj1srv, nl)
obj2spa = spawn(wsgi.server, obj2lis, obj2srv, nl)
obj3spa = spawn(wsgi.server, obj3lis, obj3srv, nl)
obj4spa = spawn(wsgi.server, obj4lis, obj4srv, nl)
obj5spa = spawn(wsgi.server, obj5lis, obj5srv, nl)
obj6spa = spawn(wsgi.server, obj6lis, obj6srv, nl)
context["test_coros"] = \
(prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa, obj3spa)
(prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa, obj3spa,
obj4spa, obj5spa, obj6spa)
# Create account
ts = normalize_timestamp(time.time())
partition, nodes = prosrv.account_ring.get_nodes('a')

View File

@ -25,6 +25,7 @@ import shutil
import re
import random
import struct
import collections
from eventlet import Timeout, sleep
from contextlib import closing, contextmanager
@ -72,7 +73,8 @@ def make_ec_archive_bodies(policy, test_body):
# encode the buffers into fragment payloads
fragment_payloads = []
for chunk in chunks:
fragments = policy.pyeclib_driver.encode(chunk)
fragments = \
policy.pyeclib_driver.encode(chunk) * policy.ec_duplication_factor
if not fragments:
break
fragment_payloads.append(fragments)
@ -664,13 +666,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_get_response(self):
part = self.part_nums[0]
node = POLICIES[0].object_ring.get_part_nodes(int(part))[0]
node = POLICIES[1].object_ring.get_part_nodes(int(part))[0]
for stat_code in (200, 400):
with mocked_http_conn(stat_code):
resp = self.reconstructor._get_response(node, part,
path='nada',
headers={},
policy=POLICIES[0])
policy=POLICIES[1])
if resp:
self.assertEqual(resp.status, 200)
else:
@ -679,12 +681,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_reconstructor_does_not_log_on_404(self):
part = self.part_nums[0]
node = POLICIES[0].object_ring.get_part_nodes(int(part))[0]
node = POLICIES[1].object_ring.get_part_nodes(int(part))[0]
with mocked_http_conn(404):
self.reconstructor._get_response(node, part,
path='some_path',
headers={},
policy=POLICIES[0])
policy=POLICIES[1])
# Make sure that no warnings are emitted for a 404
len_warning_lines = len(self.logger.get_lines_for_level('warning'))
@ -1160,6 +1162,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.policy.object_ring.max_more_nodes = \
self.policy.object_ring.replicas
self.ts_iter = make_timestamp_iter()
self.fabricated_ring = FabricatedRing()
def _configure_reconstructor(self, **kwargs):
self.conf.update(kwargs)
@ -1716,7 +1719,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['device'], self.local_dev['device'])
def test_build_jobs_primary(self):
ring = self.policy.object_ring = FabricatedRing()
ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a primary
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
@ -1761,7 +1764,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['device'], self.local_dev['device'])
def test_build_jobs_handoff(self):
ring = self.policy.object_ring = FabricatedRing()
ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a handoff
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
@ -1780,7 +1783,8 @@ class TestObjectReconstructor(unittest.TestCase):
}
# since this part doesn't belong on us it doesn't matter what
# frag_index we have
frag_index = random.randint(0, ring.replicas - 1)
frag_index = self.policy.get_backend_index(
random.randint(0, ring.replicas - 1))
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {None: 'hash'},
@ -1793,7 +1797,17 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['job_type'], object_reconstructor.REVERT)
self.assertEqual(job['frag_index'], frag_index)
self.assertEqual(sorted(job['suffixes']), sorted(stub_hashes.keys()))
self.assertEqual(len(job['sync_to']), 1)
self.assertEqual(
self.policy.ec_duplication_factor, len(job['sync_to']))
# the sync_to node should be different each other
node_ids = set([node['id'] for node in job['sync_to']])
self.assertEqual(len(node_ids),
self.policy.ec_duplication_factor)
# but all the nodes have same backend index to sync
node_indexes = set(
self.policy.get_backend_index(node['index'])
for node in job['sync_to'])
self.assertEqual(1, len(node_indexes))
self.assertEqual(job['sync_to'][0]['index'], frag_index)
self.assertEqual(job['path'], part_path)
self.assertEqual(job['partition'], partition)
@ -1801,12 +1815,12 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['local_dev'], self.local_dev)
def test_build_jobs_mixed(self):
ring = self.policy.object_ring = FabricatedRing()
ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a primary
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
try:
frag_index = [n['id'] for n in part_nodes].index(
node_index = [n['id'] for n in part_nodes].index(
self.local_dev['id'])
except ValueError:
pass
@ -1823,8 +1837,10 @@ class TestObjectReconstructor(unittest.TestCase):
'partition': partition,
'part_path': part_path,
}
other_frag_index = random.choice([f for f in range(ring.replicas)
if f != frag_index])
frag_index = self.policy.get_backend_index(node_index)
other_frag_index = self.policy.get_backend_index(
random.choice(
[f for f in range(ring.replicas) if f != node_index]))
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'456': {other_frag_index: 'hash', None: 'hash'},
@ -1858,11 +1874,12 @@ class TestObjectReconstructor(unittest.TestCase):
job = revert_jobs[0]
self.assertEqual(job['frag_index'], other_frag_index)
self.assertEqual(job['suffixes'], ['456'])
self.assertEqual(len(job['sync_to']), 1)
self.assertEqual(len(job['sync_to']),
self.policy.ec_duplication_factor)
self.assertEqual(job['sync_to'][0]['index'], other_frag_index)
def test_build_jobs_revert_only_tombstones(self):
ring = self.policy.object_ring = FabricatedRing()
ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a handoff
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
@ -1945,7 +1962,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_primary_in_sync(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
@ -2003,7 +2021,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_primary_not_in_sync(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
@ -2065,7 +2084,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_sync_missing_durable(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
@ -2133,7 +2153,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_primary_some_in_sync(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
@ -2200,9 +2221,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.fail('unexpected call %r' % call)
def test_process_job_primary_down(self):
replicas = self.policy.object_ring.replicas
partition = 0
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
@ -2269,9 +2290,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(expected_ssync_calls, found_ssync_calls)
def test_process_job_suffix_call_errors(self):
replicas = self.policy.object_ring.replicas
partition = 0
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
@ -2318,8 +2339,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertFalse(ssync_calls)
def test_process_job_handoff(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
@ -2364,8 +2385,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
def test_process_job_will_not_revert_to_handoff(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
@ -2423,8 +2444,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
def test_process_job_revert_is_handoff_fails(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
@ -2482,8 +2503,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(self.reconstructor.handoffs_remaining, 1)
def test_process_job_revert_cleanup(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
@ -2616,7 +2637,6 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
responses = list()
@ -2662,6 +2682,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(
[{'timestamp': '1234567890.12345', 'exclude': []}],
json.loads(called_header['X-Backend-Fragment-Preferences']))
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_errors_works(self):
job = {
@ -2705,6 +2728,57 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_fa_error_with_invalid_header(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(4)
base_responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
base_responses.append((200, body, headers))
responses = base_responses
# force the test to exercise the handling of this bad response by
# sticking it in near the front
error_index = random.randint(0, self.policy.ec_ndata - 1)
status, body, headers = responses[error_index]
# one esoteric failure is a literal string 'None' in place of the
# X-Object-Sysmeta-EC-Frag-Index
stub_node_job = {'some_keys': 'foo', 'but_not': 'frag_index'}
headers['X-Object-Sysmeta-Ec-Frag-Index'] = str(
stub_node_job.get('frag_index'))
# oops!
self.assertEqual('None',
headers.get('X-Object-Sysmeta-Ec-Frag-Index'))
responses[error_index] = status, body, headers
codes, body_iter, headers_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
# ... this bad request should be treated like any other failure
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_parity_fa_with_data_node_failure(self):
job = {
'partition': 0,
@ -2724,7 +2798,6 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# the scheme is 10+4, so this gets a parity node
broken_body = ec_archive_bodies.pop(-4)
@ -2748,7 +2821,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_fa_errors_fails(self):
def test_reconstruct_fa_exceptions_fails(self):
job = {
'partition': 0,
'policy': self.policy,
@ -2763,12 +2836,53 @@ class TestObjectReconstructor(unittest.TestCase):
'X-Timestamp': '1234567890.12345'
}
possible_errors = [404, Timeout(), Exception('kaboom!')]
possible_errors = [Timeout(), Exception('kaboom!')]
codes = [random.choice(possible_errors) for i in
range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# # of replicas failed and one more error log to report no enough
# responses to reconstruct.
self.assertEqual(policy.object_ring.replicas, len(error_lines))
self.assertIn(
'Unable to get enough responses (%s error responses)'
% (policy.object_ring.replicas - 1),
error_lines[-1],
"Unexpected error line found: %s" % error_lines[-1])
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_all_404s_fails(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
policy = self.policy
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
codes = [404 for i in range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report no enough responses
self.assertEqual(1, len(error_lines))
self.assertIn(
'Unable to get enough responses (%s error responses)'
% (policy.object_ring.replicas - 1),
error_lines[0],
"Unexpected error line found: %s" % error_lines[0])
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_old_etag(self):
job = {
@ -2788,13 +2902,14 @@ class TestObjectReconstructor(unittest.TestCase):
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# bad response
broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# bad response
bad_headers = {
bad_headers = get_header_frag_index(self, broken_body)
bad_headers.update({
'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal,
}
})
# good responses
responses = list()
@ -2821,6 +2936,10 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_new_etag(self):
job = {
'partition': 0,
@ -2861,17 +2980,164 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# one newer etag can spoil the bunch
new_index = random.randint(0, len(responses) - self.policy.ec_nparity)
# one newer etag won't spoil the bunch
new_index = random.randint(0, self.policy.ec_ndata - 1)
new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal})
new_response = (200, '', new_headers)
responses[new_index] = new_response
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_etag_with_same_timestamp(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
# good responses
responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
# sanity check before negative test
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# one newer timestamp but same etag won't spoil the bunch
# N.B. (FIXIME). we choose the first response as garbage, the
# reconstruction fails because all other *correct* frags will be
# assumed as garbage. To avoid the freaky failing set randint
# as [1, self.policy.ec_ndata - 1] to make the first response
# being the correct fragment to reconstruct
new_index = random.randint(1, self.policy.ec_ndata - 1)
new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage'})
new_response = (200, '', new_headers)
responses[new_index] = new_response
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
error_log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_log_lines))
self.assertIn(
'Mixed Etag (some garbage, %s) for 10.0.0.1:1001/sdb/0/a/c/o '
'policy#%s frag#1' % (etag, int(self.policy)),
error_log_lines[0])
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_not_enough_etags_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# create 3 different ec bodies
for i in range(3):
body = test_data[i:]
archive_bodies = make_ec_archive_bodies(self.policy, body)
# pop the index to the destination node
archive_bodies.pop(1)
ec_archive_dict[
(md5(body).hexdigest(), next(ts).internal)] = archive_bodies
responses = list()
# fill out response list by 3 different etag bodies
for etag, ts in itertools.cycle(ec_archive_dict):
body = ec_archive_dict[(etag, ts)].pop(0)
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': ts})
responses.append((200, body, headers))
if len(responses) >= (self.policy.object_ring.replicas - 1):
break
# sanity, there is 3 different etag and each etag
# doesn't have > ec_k bodies
etag_count = collections.Counter(
[in_resp_headers['X-Object-Sysmeta-Ec-Etag']
for _, _, in_resp_headers in responses])
self.assertEqual(3, len(etag_count))
for etag, count in etag_count.items():
self.assertLess(count, self.policy.ec_ndata)
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, dict(metadata))
job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report no enough responses
self.assertEqual(3, len(error_lines))
for error_line in error_lines:
for expected_etag, ts in ec_archive_dict:
if expected_etag in error_line:
break
else:
self.fail(
"no expected etag %s found: %s" %
(list(ec_archive_dict), error_line))
expected = 'Unable to get enough responses (%s/10) to ' \
'reconstruct 10.0.0.1:1001/sdb/0/a/c/o policy#0 ' \
'frag#1 with ETag' % etag_count[expected_etag]
self.assertIn(
expected, error_line,
"Unexpected error line found: Expected: %s Got: %s"
% (expected, error_line))
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_finds_itself_does_not_fail(self):
job = {
@ -2897,12 +3163,9 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
@ -2915,6 +3178,31 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
# the found own frag will be reported in the debug message
debug_log_lines = self.logger.get_lines_for_level('debug')
self.assertEqual(2, len(debug_log_lines))
self.assertIn(
'Found existing frag #1 while rebuilding #1 from',
debug_log_lines[0])
# ... and then, it should be skipped in the responses
# N.B. in the future, we could avoid those check because
# definately sending the copy rather than reconstruct will
# save resources. But one more reason, we're avoiding to
# use the dest index fragment even if it goes to reconstruct
# function is that it will cause a bunch of warning log from
# liberasurecode[1].
# 1: https://github.com/openstack/liberasurecode/blob/
# master/src/erasurecode.c#L870
expected_prefix = 'Reconstruct frag #1 with frag indexes'
self.assertIn(expected_prefix, debug_log_lines[1])
got_frag_index_list = json.loads(
debug_log_lines[1][len(expected_prefix):])
self.assertNotIn(1, got_frag_index_list)
def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = {
'partition': 0,
@ -2940,12 +3228,9 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies)[:-num_duplicates]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
@ -2958,6 +3243,224 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
debug_log_lines = self.logger.get_lines_for_level('debug')
self.assertEqual(1, len(debug_log_lines))
expected_prefix = 'Reconstruct frag #1 with frag indexes'
self.assertIn(expected_prefix, debug_log_lines[0])
got_frag_index_list = json.loads(
debug_log_lines[0][len(expected_prefix):])
self.assertNotIn(1, got_frag_index_list)
def test_reconstruct_fa_missing_headers(self):
# This is much negative tests asserting when the expected
# headers are missing in the responses to gather fragments
# to reconstruct
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
def test_missing_header(missing_header, expected_warning):
self.logger._clear()
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
responses[0][2].update({missing_header: None})
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no errorg
self.assertFalse(self.logger.get_lines_for_level('error'))
# ...but warning for the missing header
warning_log_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_log_lines))
self.assertIn(expected_warning, warning_log_lines)
message_base = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0 frag#0"
test_missing_header(
'X-Object-Sysmeta-Ec-Etag',
"%s %s" % (message_base, "(missing Etag)"))
test_missing_header(
'X-Object-Sysmeta-Ec-Frag-Index',
"%s %s" % (message_base,
"(invalid X-Object-Sysmeta-Ec-Frag-Index)"))
test_missing_header(
'X-Backend-Timestamp',
"%s %s" % (message_base, "(missing X-Backend-Timestamp)"))
def test_reconstruct_fa_invalid_frag_index_headers(self):
# This is much negative tests asserting when the expected
# ec frag index header has invalid value in the responses
# to gather fragments to reconstruct
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
def test_invalid_ec_frag_index_header(invalid_frag_index):
self.logger._clear()
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
responses[0][2].update({
'X-Object-Sysmeta-Ec-Frag-Index': invalid_frag_index})
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no errorg
self.assertFalse(self.logger.get_lines_for_level('error'))
# ...but warning for the invalid header
warning_log_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_log_lines))
expected_message = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
"policy#0 frag#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)"
self.assertIn(expected_message, warning_log_lines)
for value in ('None', 'invalid'):
test_invalid_ec_frag_index_header(value)
@patch_policies([ECStoragePolicy(0, name='ec', is_default=True,
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
ec_segment_size=4096,
ec_duplication_factor=2)],
fake_ring_args=[{'replicas': 28}])
class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
def setUp(self):
super(TestObjectReconstructorECDuplicationFactor, self).setUp()
self.fabricated_ring = FabricatedRing(replicas=28, devices=56)
def _test_reconstruct_with_duplicate_frags_no_errors(self, index):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[index]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(index)
responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
# make a hook point at
# swift.obj.reconstructor.ObjectReconstructor._get_response
called_headers = []
orig_func = object_reconstructor.ObjectReconstructor._get_response
def _get_response_hook(self, node, part, path, headers, policy):
called_headers.append(headers)
return orig_func(self, node, part, path, headers, policy)
# need to m + 1 node failures to reach 2nd set of duplicated fragments
failed_start_at = (
self.policy.ec_n_unique_fragments - self.policy.ec_nparity - 1)
# set Timeout for node #10, #11, #12, #13, #14
for i in range(self.policy.ec_nparity + 1):
responses[failed_start_at + i] = (Timeout(), '', '')
codes, body_iter, headers = zip(*responses)
get_response_path = \
'swift.obj.reconstructor.ObjectReconstructor._get_response'
with mock.patch(get_response_path, _get_response_hook):
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
for called_header in called_headers:
called_header = HeaderKeyDict(called_header)
self.assertTrue('Content-Length' in called_header)
self.assertEqual(called_header['Content-Length'], '0')
self.assertTrue('User-Agent' in called_header)
user_agent = called_header['User-Agent']
self.assertTrue(user_agent.startswith('obj-reconstructor'))
def test_reconstruct_with_duplicate_frags_no_errors(self):
# any fragments can be broken
for index in range(28):
self._test_reconstruct_with_duplicate_frags_no_errors(index)
if __name__ == '__main__':
unittest.main()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff