From e4972f5ac731f97b26891012107b4b8492e171fa Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Sat, 25 Feb 2017 20:28:13 -0800 Subject: [PATCH] Fixups for EC frag duplication tests Follow up for related change: - fix typos - use common helper methods - refactor some tests to reduce duplicate code Related-Change: Idd155401982a2c48110c30b480966a863f6bd305 Change-Id: I2f91a2f31e4c1b11f3d685fa8166c1a25eb87429 --- test/unit/__init__.py | 3 +- test/unit/common/test_utils.py | 2 + test/unit/obj/test_reconstructor.py | 85 +++++++--------- test/unit/proxy/controllers/test_obj.py | 128 +++++++++++------------- test/unit/proxy/test_server.py | 2 +- 5 files changed, 98 insertions(+), 122 deletions(-) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 8590a28d..c54e77a3 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -1147,7 +1147,8 @@ def encode_frag_archive_bodies(policy, body): # encode the buffers into fragment payloads fragment_payloads = [] for chunk in chunks: - fragments = policy.pyeclib_driver.encode(chunk) + fragments = policy.pyeclib_driver.encode(chunk) \ + * policy.ec_duplication_factor if not fragments: break fragment_payloads.append(fragments) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index a34d4f06..95b36b65 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -2584,6 +2584,8 @@ cluster_dfw1 = http://dfw1.host/v1/ 1: 1, '2': 2, '1024': 1024, + '0': ValueError, + '-1': ValueError, '0x01': ValueError, 'asdf': ValueError, None: ValueError, diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 0ef46cf9..2339ad92 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -42,7 +42,7 @@ from swift.obj.reconstructor import REVERT from test.unit import (patch_policies, debug_logger, mocked_http_conn, FabricatedRing, make_timestamp_iter, - DEFAULT_TEST_EC_TYPE) + DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies) from test.unit.obj.common import write_diskfile @@ -65,25 +65,6 @@ def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs): yield fake_ssync -def make_ec_archive_bodies(policy, test_body): - segment_size = policy.ec_segment_size - # split up the body into buffers - chunks = [test_body[x:x + segment_size] - for x in range(0, len(test_body), segment_size)] - # encode the buffers into fragment payloads - fragment_payloads = [] - for chunk in chunks: - fragments = \ - policy.pyeclib_driver.encode(chunk) * policy.ec_duplication_factor - if not fragments: - break - fragment_payloads.append(fragments) - - # join up the fragment payloads per node - ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)] - return ec_archive_bodies - - def _create_test_rings(path): testgz = os.path.join(path, 'object.ring.gz') intended_replica2part2dev_id = [ @@ -2636,7 +2617,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) responses = list() @@ -2702,7 +2683,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(4) @@ -2744,7 +2725,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(4) @@ -2774,7 +2755,7 @@ class TestObjectReconstructor(unittest.TestCase): df = self.reconstructor.reconstruct_fa( job, node, dict(metadata)) fixed_body = ''.join(df.reader()) - # ... this bad request should be treated like any other failure + # ... this bad response should be ignored like any other failure self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(md5(fixed_body).hexdigest(), md5(broken_body).hexdigest()) @@ -2797,7 +2778,7 @@ class TestObjectReconstructor(unittest.TestCase): # segment size) test_data = ('rebuild' * self.policy.ec_segment_size)[:-454] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) # the scheme is 10+4, so this gets a parity node broken_body = ec_archive_bodies.pop(-4) @@ -2843,9 +2824,11 @@ class TestObjectReconstructor(unittest.TestCase): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, job, node, metadata) error_lines = self.logger.get_lines_for_level('error') - # # of replicas failed and one more error log to report no enough + # # of replicas failed and one more error log to report not enough # responses to reconstruct. self.assertEqual(policy.object_ring.replicas, len(error_lines)) + for line in error_lines[:-1]: + self.assertIn("Trying to GET", line) self.assertIn( 'Unable to get enough responses (%s error responses)' % (policy.object_ring.replicas - 1), @@ -2874,7 +2857,7 @@ class TestObjectReconstructor(unittest.TestCase): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, job, node, metadata) error_lines = self.logger.get_lines_for_level('error') - # only 1 log to report no enough responses + # only 1 log to report not enough responses self.assertEqual(1, len(error_lines)) self.assertIn( 'Unable to get enough responses (%s error responses)' @@ -2900,11 +2883,11 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) # bad response broken_body = ec_archive_bodies.pop(1) - ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + ts = make_timestamp_iter() bad_headers = get_header_frag_index(self, broken_body) bad_headers.update({ 'X-Object-Sysmeta-Ec-Etag': 'some garbage', @@ -2920,8 +2903,8 @@ class TestObjectReconstructor(unittest.TestCase): 'X-Backend-Timestamp': t1}) responses.append((200, body, headers)) - # mixed together - error_index = random.randint(0, self.policy.ec_ndata) + # include the one older frag with different etag in first responses + error_index = random.randint(0, self.policy.ec_ndata - 1) error_headers = get_header_frag_index(self, (responses[error_index])[1]) error_headers.update(bad_headers) @@ -2956,10 +2939,10 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) - ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + ts = make_timestamp_iter() # good responses responses = list() @@ -3016,7 +2999,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) @@ -3037,12 +3020,12 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(md5(fixed_body).hexdigest(), md5(broken_body).hexdigest()) - # one newer timestamp but same etag won't spoil the bunch - # N.B. (FIXIME). we choose the first response as garbage, the + # a response at same timestamp but different etag won't spoil the bunch + # N.B. (FIXME). if we choose the first response as garbage, the # reconstruction fails because all other *correct* frags will be # assumed as garbage. To avoid the freaky failing set randint # as [1, self.policy.ec_ndata - 1] to make the first response - # being the correct fragment to reconstruct + # always have the correct etag to reconstruct new_index = random.randint(1, self.policy.ec_ndata - 1) new_headers = get_header_frag_index(self, (responses[new_index])[1]) new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage'}) @@ -3057,7 +3040,7 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(md5(fixed_body).hexdigest(), md5(broken_body).hexdigest()) - # no error and warning + # expect an error log but no warnings error_log_lines = self.logger.get_lines_for_level('error') self.assertEqual(1, len(error_log_lines)) self.assertIn( @@ -3082,11 +3065,11 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] ec_archive_dict = dict() - ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + ts = make_timestamp_iter() # create 3 different ec bodies for i in range(3): body = test_data[i:] - archive_bodies = make_ec_archive_bodies(self.policy, body) + archive_bodies = encode_frag_archive_bodies(self.policy, body) # pop the index to the destination node archive_bodies.pop(1) ec_archive_dict[ @@ -3118,7 +3101,7 @@ class TestObjectReconstructor(unittest.TestCase): job, node, metadata) error_lines = self.logger.get_lines_for_level('error') - # only 1 log to report no enough responses + # 1 error log per etag to report not enough responses self.assertEqual(3, len(error_lines)) for error_line in error_lines: for expected_etag, ts in ec_archive_dict: @@ -3155,7 +3138,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) # instead of popping the broken body, we'll just leave it in the list # of responses and take away something else. @@ -3190,7 +3173,7 @@ class TestObjectReconstructor(unittest.TestCase): # ... and then, it should be skipped in the responses # N.B. in the future, we could avoid those check because - # definately sending the copy rather than reconstruct will + # definitely sending the copy rather than reconstruct will # save resources. But one more reason, we're avoiding to # use the dest index fragment even if it goes to reconstruct # function is that it will cause a bunch of warning log from @@ -3219,7 +3202,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) # add some duplicates @@ -3274,7 +3257,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) @@ -3300,7 +3283,7 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(md5(fixed_body).hexdigest(), md5(broken_body).hexdigest()) - # no errorg + # no errors self.assertFalse(self.logger.get_lines_for_level('error')) # ...but warning for the missing header warning_log_lines = self.logger.get_lines_for_level('warning') @@ -3341,7 +3324,7 @@ class TestObjectReconstructor(unittest.TestCase): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) @@ -3368,7 +3351,7 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(md5(fixed_body).hexdigest(), md5(broken_body).hexdigest()) - # no errorg + # no errors self.assertFalse(self.logger.get_lines_for_level('error')) # ...but warning for the invalid header warning_log_lines = self.logger.get_lines_for_level('warning') @@ -3409,7 +3392,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() - ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(index) @@ -3428,11 +3411,11 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): called_headers.append(headers) return orig_func(self, node, part, path, headers, policy) - # need to m + 1 node failures to reach 2nd set of duplicated fragments + # need parity + 1 node failures to reach duplicated fragments failed_start_at = ( self.policy.ec_n_unique_fragments - self.policy.ec_nparity - 1) - # set Timeout for node #10, #11, #12, #13, #14 + # set Timeout for node #9, #10, #11, #12, #13 for i in range(self.policy.ec_nparity + 1): responses[failed_start_at + i] = (Timeout(), '', '') diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 59545ae6..a73608ed 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2205,7 +2205,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): _part, primary_nodes = self.obj_ring.get_nodes('a', 'c', 'o') node_key = lambda n: (n['ip'], n['port']) - backend_index = lambda index: self.policy.get_backend_index(index) + backend_index = self.policy.get_backend_index ts = self._ts_iter.next() response_map = { @@ -3654,68 +3654,58 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): class TestECFunctions(unittest.TestCase): def test_chunk_transformer(self): - segment_size = 1024 - orig_chunk = 'a' * segment_size - policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE, - ec_ndata=8, ec_nparity=2, - object_ring=FakeRing(replicas=10), - ec_segment_size=segment_size) - expected = policy.pyeclib_driver.encode(orig_chunk) - transform = obj.chunk_transformer( - policy, policy.object_ring.replica_count) - transform.send(None) + def do_test(dup): + segment_size = 1024 + orig_chunk = 'a' * segment_size + policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=8, ec_nparity=2, + object_ring=FakeRing(replicas=10 * dup), + ec_segment_size=segment_size, + ec_duplication_factor=dup) + expected = policy.pyeclib_driver.encode(orig_chunk) + transform = obj.chunk_transformer( + policy, policy.object_ring.replica_count) + transform.send(None) - backend_chunks = transform.send(orig_chunk) - self.assertNotEqual(None, backend_chunks) # sanity - self.assertEqual( - len(backend_chunks), policy.object_ring.replica_count) - self.assertEqual(expected, backend_chunks) + backend_chunks = transform.send(orig_chunk) + self.assertIsNotNone(backend_chunks) # sanity + self.assertEqual( + len(backend_chunks), policy.object_ring.replica_count) + self.assertEqual(expected * dup, backend_chunks) - def test_chunk_transformer_duplication_factor(self): - segment_size = 1024 - orig_chunk = 'a' * segment_size - policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE, - ec_ndata=8, ec_nparity=2, - object_ring=FakeRing(replicas=20), - ec_segment_size=segment_size, - ec_duplication_factor=2) - expected = policy.pyeclib_driver.encode(orig_chunk) - transform = obj.chunk_transformer( - policy, policy.object_ring.replica_count) - transform.send(None) + # flush out last chunk buffer + backend_chunks = transform.send('') + self.assertEqual( + len(backend_chunks), policy.object_ring.replica_count) + self.assertEqual([''] * policy.object_ring.replica_count, + backend_chunks) + do_test(1) + do_test(2) + do_test(3) - backend_chunks = transform.send(orig_chunk) - self.assertNotEqual(None, backend_chunks) # sanity - self.assertEqual( - len(backend_chunks), policy.object_ring.replica_count) - self.assertEqual(expected * 2, backend_chunks) - - # flush out last chunk buffer - backend_chunks = transform.send('') - self.assertEqual( - len(backend_chunks), policy.object_ring.replica_count) - self.assertEqual([''] * policy.object_ring.replica_count, - backend_chunks) - - def test_chunk_transformer_duplication_factor_non_aligned_last_chunk(self): + def test_chunk_transformer_non_aligned_last_chunk(self): last_chunk = 'a' * 128 - policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE, - ec_ndata=8, ec_nparity=2, - object_ring=FakeRing(replicas=20), - ec_segment_size=1024, - ec_duplication_factor=2) - expected = policy.pyeclib_driver.encode(last_chunk) - transform = obj.chunk_transformer( - policy, policy.object_ring.replica_count) - transform.send(None) - transform.send(last_chunk) - # flush out last chunk buffer - backend_chunks = transform.send('') + def do_test(dup): + policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=8, ec_nparity=2, + object_ring=FakeRing(replicas=10 * dup), + ec_segment_size=1024, + ec_duplication_factor=dup) + expected = policy.pyeclib_driver.encode(last_chunk) + transform = obj.chunk_transformer( + policy, policy.object_ring.replica_count) + transform.send(None) - self.assertEqual( - len(backend_chunks), policy.object_ring.replica_count) - self.assertEqual(expected * 2, backend_chunks) + transform.send(last_chunk) + # flush out last chunk buffer + backend_chunks = transform.send('') + + self.assertEqual( + len(backend_chunks), policy.object_ring.replica_count) + self.assertEqual(expected * dup, backend_chunks) + do_test(1) + do_test(2) @patch_policies([ECStoragePolicy(0, name='ec', is_default=True, @@ -3767,7 +3757,7 @@ class TestECDuplicationObjController( index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index'] collected_responses[etag].add(index) - # the backend requests should be >= num_data_fragmetns + # the backend requests should be >= num_data_fragments self.assertGreaterEqual(len(log), self.policy.ec_ndata) # but <= # of replicas self.assertLessEqual(len(log), self.replicas()) @@ -3820,9 +3810,9 @@ class TestECDuplicationObjController( {'obj': obj, 'frag': 5}, {'obj': obj, 'frag': 6}, {'obj': obj, 'frag': 6}, - {'obj': obj, 'frag': 7}, - {'obj': obj, 'frag': 7}, # second half of # of replicas are 7, 8, 9, 10, 11, 12, 13 + {'obj': obj, 'frag': 7}, + {'obj': obj, 'frag': 7}, {'obj': obj, 'frag': 8}, {'obj': obj, 'frag': 8}, {'obj': obj, 'frag': 9}, @@ -3865,7 +3855,7 @@ class TestECDuplicationObjController( {'obj': obj2, 'frag': 8}, ] # ... and the rests are 404s which is limited by request_count - # (2 * replicas in default) rather than max_extra_count limitation + # (2 * replicas in default) rather than max_extra_requests limitation # because the retries will be in ResumingGetter if the responses # are 404s node_frags += [[]] * (self.replicas() * 2 - len(node_frags)) @@ -3917,7 +3907,7 @@ class TestECDuplicationObjController( ] # ... and the rests are 404s which is limited by request_count - # (2 * replicas in default) rather than max_extra_count limitation + # (2 * replicas in default) rather than max_extra_requests limitation # because the retries will be in ResumingGetter if the responses # are 404s node_frags += [[]] * (self.replicas() * 2 - len(node_frags)) @@ -3991,9 +3981,9 @@ class TestECDuplicationObjController( # ... regardless we should never need to fetch more than ec_ndata # frags for any given etag for etag, frags in collected_responses.items(): - self.assertTrue(len(frags) <= self.policy.ec_ndata, - 'collected %s frags for etag %s' % ( - len(frags), etag)) + self.assertLessEqual(len(frags), self.policy.ec_ndata, + 'collected %s frags for etag %s' % ( + len(frags), etag)) def test_GET_with_duplicate_but_sufficient_frag_indexes(self): obj1 = self._make_ec_object_stub() @@ -4108,9 +4098,9 @@ class TestECDuplicationObjController( # ... regardless we should never need to fetch more than ec_ndata # frags for any given etag for etag, frags in collected_responses.items(): - self.assertTrue(len(frags) <= self.policy.ec_ndata, - 'collected %s frags for etag %s' % ( - len(frags), etag)) + self.assertLessEqual(len(frags), self.policy.ec_ndata, + 'collected %s frags for etag %s' % ( + len(frags), etag)) def test_GET_with_mixed_frags_and_no_quorum_will_503(self): # all nodes have a frag but there is no one set that reaches quorum, @@ -4332,7 +4322,7 @@ class TestECDuplicationObjController( unique, self.policy.get_backend_index(duplicated)) # sanity putters.pop(duplicated) - # pop one more frag ment too to make one missing hole + # pop one more fragment too to make one missing hole putters.pop(one_more_missing) # then determine chunk, we have 26 putters here and unique frag diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 183682ac..50e7466c 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -5665,7 +5665,7 @@ class TestECMismatchedFA(unittest.TestCase): environ={"REQUEST_METHOD": "PUT"}, headers={"X-Storage-Policy": "ec-dup", "X-Auth-Token": "t"}) resp = ensure_container.get_response(prosrv) - self.assertTrue(resp.status_int in (201, 202)) + self.assertIn(resp.status_int, (201, 202)) obj1 = "first version..." put_req1 = Request.blank(