Optimize ec duplication and its md5 hashing

Originally, ec duplication is designed to make duplicated copy at
chunk_transformer function for each node. However, it can cause doubled
(or more than, depends on ec_duplication_factor) etag calculation which
should be expensive for Swift proxy work.

Since this patch, chunk_transformer keeps only an unique fragments slot
to store, and then, send_chunk in ECObjectController._transfer_data
picks up the suitable chunk by unique backend fragment index assigned by
_determine_chunk_destination function to send to object server.

Note that, Putter still keeps node_index but the new putter_to_frag_index
dict and frag_hasher (chunk_index and chunk_hasher in the old names)
now refers the value by fragment index.

Change-Id: Ib9e8a6f67c2985164dd20b049c7f144f19fd1822
This commit is contained in:
Kota Tsuyuzaki 2017-01-17 20:27:09 -08:00
parent cf1c44dff0
commit 4187c839c3
2 changed files with 72 additions and 64 deletions

View File

@ -1747,7 +1747,12 @@ class MIMEPutter(Putter):
mime_boundary, multiphase=need_multiphase)
def chunk_transformer(policy, nstreams):
def chunk_transformer(policy):
"""
A generator to transform a source chunk to erasure coded chunks for each
`send` call. The number of erasure coded chunks is as
policy.ec_n_unique_fragments.
"""
segment_size = policy.ec_segment_size
buf = collections.deque()
@ -1775,9 +1780,8 @@ def chunk_transformer(policy, nstreams):
frags_by_byte_order = []
for chunk_to_encode in chunks_to_encode:
encoded_chunks = policy.pyeclib_driver.encode(chunk_to_encode)
send_chunks = encoded_chunks * policy.ec_duplication_factor
frags_by_byte_order.append(send_chunks)
frags_by_byte_order.append(
policy.pyeclib_driver.encode(chunk_to_encode))
# Sequential calls to encode() have given us a list that
# looks like this:
#
@ -1802,9 +1806,9 @@ def chunk_transformer(policy, nstreams):
last_bytes = ''.join(buf)
if last_bytes:
last_frags = policy.pyeclib_driver.encode(last_bytes)
yield last_frags * policy.ec_duplication_factor
yield last_frags
else:
yield [''] * nstreams
yield [''] * policy.ec_n_unique_fragments
def trailing_metadata(policy, client_obj_hasher,
@ -2330,28 +2334,29 @@ class ECObjectController(BaseObjectController):
def _determine_chunk_destinations(self, putters, policy):
"""
Given a list of putters, return a dict where the key is the putter
and the value is the node index to use.
and the value is the frag index to use.
This is done so that we line up handoffs using the same node index
This is done so that we line up handoffs using the same frag index
(in the primary part list) as the primary that the handoff is standing
in for. This lets erasure-code fragment archives wind up on the
preferred local primary nodes when possible.
:param putters: a list of swift.proxy.controllers.obj.MIMEPutter
instance
:param policy: A policy instance
:param policy: A policy instance which should be one of ECStoragePolicy
"""
# Give each putter a "chunk index": the index of the
# Give each putter a "frag index": the index of the
# transformed chunk that we'll send to it.
#
# For primary nodes, that's just its index (primary 0 gets
# chunk 0, primary 1 gets chunk 1, and so on). For handoffs,
# we assign the chunk index of a missing primary.
handoff_conns = []
chunk_index = {}
putter_to_frag_index = {}
for p in putters:
if p.node_index is not None:
chunk_index[p] = p.node_index
putter_to_frag_index[p] = policy.get_backend_index(
p.node_index)
else:
handoff_conns.append(p)
@ -2362,35 +2367,33 @@ class ECObjectController(BaseObjectController):
# returns 507, in which case a handoff is used to replace it.
# lack_list is a dict of list to keep hole indexes
# e.g. if we have 2 holes for index 0 with ec_duplication_factor=2
# e.g. if we have 2 holes for frag index 0 with ec_duplication_factor=2
# lack_list is like {0: [0], 1: [0]}, and then, if 1 hole found
# for index 1, lack_list will be {0: [0, 1], 1: [0]}.
# for frag index 1, lack_list will be {0: [0, 1], 1: [0]}.
# After that, holes will be filled from bigger key
# (i.e. 1:[0] at first)
# Grouping all missing fragment indexes for each unique_index
unique_index_to_holes = collections.defaultdict(list)
available_indexes = chunk_index.values()
for node_index in range(policy.object_ring.replica_count):
if node_index not in available_indexes:
unique_index = policy.get_backend_index(node_index)
unique_index_to_holes[unique_index].append(node_index)
# Set the missing index to lack_list
# Grouping all missing fragment indexes for each frag_index
available_indexes = putter_to_frag_index.values()
lack_list = collections.defaultdict(list)
for unique_index, holes in unique_index_to_holes.items():
for lack_tier, hole_node_index in enumerate(holes):
lack_list[lack_tier].append(hole_node_index)
for frag_index in range(policy.ec_n_unique_fragments):
# Set the missing index to lack_list
available_count = available_indexes.count(frag_index)
# N.B. it should be duplication_factor >= lack >= 0
lack = policy.ec_duplication_factor - available_count
# now we are missing one or more nodes to store the frag index
for lack_tier in range(lack):
lack_list[lack_tier].append(frag_index)
# Extract the lack_list to a flat list
holes = []
for lack_tier, indexes in sorted(lack_list.items(), reverse=True):
holes.extend(indexes)
# Fill chunk_index list with the hole list
# Fill putter_to_frag_index list with the hole list
for hole, p in zip(holes, handoff_conns):
chunk_index[p] = hole
return chunk_index
putter_to_frag_index[p] = hole
return putter_to_frag_index
def _transfer_data(self, req, policy, data_source, putters, nodes,
min_conns, etag_hasher):
@ -2400,15 +2403,15 @@ class ECObjectController(BaseObjectController):
This method was added in the PUT method extraction change
"""
bytes_transferred = 0
chunk_transform = chunk_transformer(policy, len(nodes))
chunk_transform = chunk_transformer(policy)
chunk_transform.send(None)
chunk_hashers = collections.defaultdict(md5)
frag_hashers = collections.defaultdict(md5)
def send_chunk(chunk):
# Note: there's two different hashers in here. etag_hasher is
# hashing the original object so that we can validate the ETag
# that the client sent (and etag_hasher is None if the client
# didn't send one). The hasher in chunk_hashers is hashing the
# didn't send one). The hasher in frag_hashers is hashing the
# fragment archive being sent to the client; this lets us guard
# against data corruption on the network between proxy and
# object server.
@ -2420,11 +2423,17 @@ class ECObjectController(BaseObjectController):
# or whatever we're doing, the transform will give us None.
return
updated_frag_indexes = set()
for putter in list(putters):
ci = chunk_index[putter]
backend_chunk = backend_chunks[ci]
frag_index = putter_to_frag_index[putter]
backend_chunk = backend_chunks[frag_index]
if not putter.failed:
chunk_hashers[ci].update(backend_chunk)
# N.B. same frag_index will appear when using
# ec_duplication_factor >= 2. So skip to feed the chunk
# to hasher if the frag was updated already.
if frag_index not in updated_frag_indexes:
frag_hashers[frag_index].update(backend_chunk)
updated_frag_indexes.add(frag_index)
putter.send_chunk(backend_chunk)
else:
putter.close()
@ -2437,9 +2446,9 @@ class ECObjectController(BaseObjectController):
try:
with ContextPool(len(putters)) as pool:
# build our chunk index dict to place handoffs in the
# build our putter_to_frag_index dict to place handoffs in the
# same part nodes index as the primaries they are covering
chunk_index = self._determine_chunk_destinations(
putter_to_frag_index = self._determine_chunk_destinations(
putters, policy)
for putter in putters:
@ -2487,14 +2496,14 @@ class ECObjectController(BaseObjectController):
footers = {(k, v) for k, v in footers.items()
if not k.lower().startswith('x-object-sysmeta-ec-')}
for putter in putters:
ci = chunk_index[putter]
frag_index = putter_to_frag_index[putter]
# Update any footers set by middleware with EC footers
trail_md = trailing_metadata(
policy, etag_hasher,
bytes_transferred, policy.get_backend_index(ci))
bytes_transferred, frag_index)
trail_md.update(footers)
# Etag footer must always be hash of what we sent
trail_md['Etag'] = chunk_hashers[ci].hexdigest()
trail_md['Etag'] = frag_hashers[frag_index].hexdigest()
putter.end_of_object_data(footer_metadata=trail_md)
for putter in putters:

View File

@ -3663,21 +3663,20 @@ class TestECFunctions(unittest.TestCase):
ec_segment_size=segment_size,
ec_duplication_factor=dup)
expected = policy.pyeclib_driver.encode(orig_chunk)
transform = obj.chunk_transformer(
policy, policy.object_ring.replica_count)
transform = obj.chunk_transformer(policy)
transform.send(None)
backend_chunks = transform.send(orig_chunk)
self.assertIsNotNone(backend_chunks) # sanity
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual(expected * dup, backend_chunks)
len(backend_chunks), policy.ec_n_unique_fragments)
self.assertEqual(expected, backend_chunks)
# flush out last chunk buffer
backend_chunks = transform.send('')
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual([''] * policy.object_ring.replica_count,
len(backend_chunks), policy.ec_n_unique_fragments)
self.assertEqual([''] * policy.ec_n_unique_fragments,
backend_chunks)
do_test(1)
do_test(2)
@ -3693,8 +3692,7 @@ class TestECFunctions(unittest.TestCase):
ec_segment_size=1024,
ec_duplication_factor=dup)
expected = policy.pyeclib_driver.encode(last_chunk)
transform = obj.chunk_transformer(
policy, policy.object_ring.replica_count)
transform = obj.chunk_transformer(policy)
transform.send(None)
transform.send(last_chunk)
@ -3702,8 +3700,9 @@ class TestECFunctions(unittest.TestCase):
backend_chunks = transform.send('')
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual(expected * dup, backend_chunks)
len(backend_chunks), policy.ec_n_unique_fragments)
self.assertEqual(expected, backend_chunks)
do_test(1)
do_test(2)
@ -4310,12 +4309,13 @@ class TestECDuplicationObjController(
got = controller._determine_chunk_destinations(putters, self.policy)
expected = {}
for i, p in enumerate(putters):
expected[p] = i
expected[p] = self.policy.get_backend_index(i)
# sanity
self.assertEqual(got, expected)
# now lets make an unique fragment as handoffs
putters[unique].node_index = None
handoff_putter = putters[unique]
handoff_putter.node_index = None
# and then, pop a fragment which has same fragment index with unique
self.assertEqual(
@ -4329,21 +4329,20 @@ class TestECDuplicationObjController(
# index 0 missing 2 copies and unique frag index 1 missing 1 copy
# i.e. the handoff node should be assigned to unique frag index 1
got = controller._determine_chunk_destinations(putters, self.policy)
self.assertEqual(len(expected) - 2, len(got))
# index one_more_missing should not be choosen
self.assertNotIn(one_more_missing, got.values())
# either index unique or duplicated should be in the got dict
self.assertTrue(
any([unique in got.values(), duplicated in got.values()]))
# but it's not both
self.assertFalse(
all([unique in got.values(), duplicated in got.values()]))
# N.B. len(putters) is now len(expected - 2) due to pop twice
self.assertEqual(len(putters), len(got))
# sanity, no node index - for handoff putter
self.assertIsNone(handoff_putter.node_index)
self.assertEqual(got[handoff_putter], unique)
# sanity, other nodes execpt handoff_putter have node_index
self.assertTrue(all(
[putter.node_index for putter in got if
putter != handoff_putter]))
def test_determine_chunk_destinations_prioritize_more_missing(self):
# drop 0, 14 and 1 should work
# drop node_index 0, 14 and 1 should work
self._test_determine_chunk_destinations_prioritize(0, 14, 1)
# drop 1, 15 and 0 should work, too
# drop node_index 1, 15 and 0 should work, too
self._test_determine_chunk_destinations_prioritize(1, 15, 0)