From 259afd2a756bd300ffac0591ab81d2faa160fd18 Mon Sep 17 00:00:00 2001 From: Matthew Oliver Date: Tue, 2 Dec 2025 16:41:23 +1100 Subject: [PATCH] ssync: Refactor tests to use Timestamp and new assert Created a new assertConnectionMessages() assert that will take it a list of message strings (without the length and added \r\n), these are added and compared against the actual connection messages passed in. I.e: self.assertEqual( b''.join(connection.sent), b'17\r\n:MISSING_CHECK: START\r\n\r\n' b'15\r\n:MISSING_CHECK: END\r\n\r\n') Becomes: self.assertConnectionMessages( [b':MISSING_CHECK: START', b':MISSING_CHECK: END'] connection.sent) And the hex lengths are automatically calculated. This because more useful when you pass in variable messages and timestamps. Change-Id: I405e218f6e6d6f12818f611e08d5c7f8963ee8c6 Signed-off-by: Matthew Oliver --- test/unit/obj/common.py | 8 + test/unit/obj/test_ssync_sender.py | 464 ++++++++++++++++------------- 2 files changed, 261 insertions(+), 211 deletions(-) diff --git a/test/unit/obj/common.py b/test/unit/obj/common.py index 41cc08fa3b..4cf8ff01c1 100644 --- a/test/unit/obj/common.py +++ b/test/unit/obj/common.py @@ -20,6 +20,7 @@ import unittest from swift.common import utils from swift.common.storage_policy import POLICIES from swift.common.utils import Timestamp, md5 +from test.unit import make_timestamp_iter def write_diskfile(df, timestamp, data=b'test data', frag_index=None, @@ -66,10 +67,17 @@ class BaseTest(unittest.TestCase): } # daemon will be set in subclass setUp self.daemon = None + self._ts_iter = make_timestamp_iter() def tearDown(self): shutil.rmtree(self.tmpdir, ignore_errors=True) + def ts(self): + """ + Timestamps - forever. + """ + return next(self._ts_iter) + def _make_diskfile(self, device='dev', partition='9', account='a', container='c', obj='o', body=b'test', extra_metadata=None, policy=None, diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 413937e7a7..332c74ebdb 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -96,8 +96,35 @@ class FakeConnection(object): self.closed = True +class SenderBase(BaseTest): + def assertConnectionMessages(self, expected_messages, messages, + cmds_with_newlines=False): + """Assert ssync sender messages + + You only need to provide the expected lines of text. The + hex length and \\r\\n will be added automatically, eg: + + [':MISSING_CHECK: START', ' ', ':MISSING_CHECK: END' ] + + Commands in the ssync protocol need to end with a \\r\\n. If you + decide to provide them (because request data tends to not incude them) + then you can use the `cmds_with_newlines` flag to indicate this. + + """ + expected = b"" + for line in expected_messages: + if isinstance(line, str): + line = wsgi_to_bytes(line) + if line: + if not cmds_with_newlines: + line = b"%s\r\n" % line + expected += b'%x\r\n%s\r\n' % (len(line), line) + + self.assertEqual(expected, b''.join(messages)) + + @patch_policies() -class TestSender(BaseTest): +class TestSender(SenderBase): def setUp(self): skip_if_no_xattrs() @@ -560,12 +587,14 @@ class TestSender(BaseTest): self.assertTrue(found_put) def test_call_and_missing_check(self): + ts = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ and policy == POLICIES.legacy: yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts}) else: raise Exception( 'No match for %r %r %r' % (device, partition, suffixes)) @@ -593,15 +622,17 @@ class TestSender(BaseTest): self.assertTrue(success) self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)})])) + {'ts_data': ts})])) def test_call_and_missing_check_with_obj_list(self): + ts = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ and policy == POLICIES.legacy: yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts}) else: raise Exception( 'No match for %r %r %r' % (device, partition, suffixes)) @@ -627,7 +658,7 @@ class TestSender(BaseTest): self.assertTrue(success) self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)})])) + {'ts_data': ts})])) def test_call_and_missing_check_with_obj_list_but_required(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -804,13 +835,16 @@ class TestSender(BaseTest): self.assertFalse(self.sender.limited_by_max_objects) def test_call_and_missing_check_timeout_send_line(self): + ts1 = self.ts() + ts2 = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts1}) yield ( '9d41d8cd98f00b204e9800998ecf0def', - {'ts_data': Timestamp(1380144471.00000)}) + {'ts_data': ts2}) response = FakeResponse() # max_objects unlimited @@ -836,11 +870,12 @@ class TestSender(BaseTest): '0 lines (0 bytes) sent', log_lines) self.assertFalse(self.sender.limited_by_max_objects) # only the first missing check item was sent, plus a disconnect line - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'0\r\n\r\n') + self.assertConnectionMessages( + [b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % + ts1.internal.encode('ascii'), + ''], + connection.sent) def test_missing_check_has_empty_suffixes(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -867,31 +902,33 @@ class TestSender(BaseTest): self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertConnectionMessages( + [b':MISSING_CHECK: START', + b':MISSING_CHECK: END'], + connection.sent) self.assertEqual(send_map, {}) self.assertEqual(available_map, {}) self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_has_suffixes(self): + timestamps = [self.ts() for _ in range(6)] + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == POLICIES.legacy and suffixes == ['abc', 'def']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': timestamps[0]}) yield ( '9d41d8cd98f00b204e9800998ecf0def', - {'ts_data': Timestamp(1380144472.22222), - 'ts_meta': Timestamp(1380144473.22222)}) + {'ts_data': timestamps[1], + 'ts_meta': timestamps[2]}) yield ( '9d41d8cd98f00b204e9800998ecf1def', - {'ts_data': Timestamp(1380144474.44444), - 'ts_ctype': Timestamp(1380144474.44448), - 'ts_meta': Timestamp(1380144475.44444)}) + {'ts_data': timestamps[3], + 'ts_ctype': timestamps[4], + 'ts_meta': timestamps[5]}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -916,30 +953,34 @@ class TestSender(BaseTest): self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 ' - b'm:186a0\r\n\r\n' - b'3f\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 ' - b'm:186a0,t:4\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % ( + timestamps[0].internal.encode('ascii')), + b'9d41d8cd98f00b204e9800998ecf0def %s m:186a0' % ( + timestamps[1].internal.encode('ascii')), + (b'9d41d8cd98f00b204e9800998ecf1def %s ' + b'm:30d40,t:186a0' % ( + timestamps[3].internal.encode('ascii'))), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertEqual(send_map, {}) candidates = [('9d41d8cd98f00b204e9800998ecf0abc', - dict(ts_data=Timestamp(1380144470.00000))), + dict(ts_data=timestamps[0])), ('9d41d8cd98f00b204e9800998ecf0def', - dict(ts_data=Timestamp(1380144472.22222), - ts_meta=Timestamp(1380144473.22222))), + dict(ts_data=timestamps[1], + ts_meta=timestamps[2])), ('9d41d8cd98f00b204e9800998ecf1def', - dict(ts_data=Timestamp(1380144474.44444), - ts_meta=Timestamp(1380144475.44444), - ts_ctype=Timestamp(1380144474.44448)))] + dict(ts_data=timestamps[3], + ts_meta=timestamps[5], + ts_ctype=timestamps[4]))] self.assertEqual(available_map, dict(candidates)) self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_max_objects_less_than_actual_objects(self): + timestamps = [self.ts() for _ in range(6)] + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): # verify missing_check stops after 2 objects even though more # objects would yield @@ -948,16 +989,16 @@ class TestSender(BaseTest): suffixes == ['abc', 'def']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': timestamps[0]}) yield ( '9d41d8cd98f00b204e9800998ecf0def', - {'ts_data': Timestamp(1380144472.22222), - 'ts_meta': Timestamp(1380144473.22222)}) + {'ts_data': timestamps[1], + 'ts_meta': timestamps[2]}) yield ( '9d41d8cd98f00b204e9800998ecf1def', - {'ts_data': Timestamp(1380144474.44444), - 'ts_ctype': Timestamp(1380144474.44448), - 'ts_meta': Timestamp(1380144475.44444)}) + {'ts_data': timestamps[3], + 'ts_ctype': timestamps[4], + 'ts_meta': timestamps[5]}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -982,19 +1023,20 @@ class TestSender(BaseTest): self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 ' - b'm:186a0\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % ( + timestamps[0].internal.encode('ascii')), + b'9d41d8cd98f00b204e9800998ecf0def %s m:186a0' % ( + timestamps[1].internal.encode('ascii')), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertEqual(send_map, {}) candidates = [('9d41d8cd98f00b204e9800998ecf0abc', - dict(ts_data=Timestamp(1380144470.00000))), + dict(ts_data=timestamps[0])), ('9d41d8cd98f00b204e9800998ecf0def', - dict(ts_data=Timestamp(1380144472.22222), - ts_meta=Timestamp(1380144473.22222)))] + dict(ts_data=timestamps[1], + ts_meta=timestamps[2]))] self.assertEqual(available_map, dict(candidates)) self.assertEqual( ['ssync missing_check truncated after 2 objects: device: dev, ' @@ -1004,17 +1046,19 @@ class TestSender(BaseTest): self.assertTrue(self.sender.limited_by_max_objects) def test_missing_check_max_objects_exactly_actual_objects(self): + timestamps = [self.ts() for _ in range(3)] + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == POLICIES.legacy and suffixes == ['abc', 'def']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': timestamps[0]}) yield ( '9d41d8cd98f00b204e9800998ecf0def', - {'ts_data': Timestamp(1380144472.22222), - 'ts_meta': Timestamp(1380144473.22222)}) + {'ts_data': timestamps[1], + 'ts_meta': timestamps[2]}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -1039,32 +1083,35 @@ class TestSender(BaseTest): self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 ' - b'm:186a0\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % ( + timestamps[0].internal.encode('ascii')), + b'9d41d8cd98f00b204e9800998ecf0def %s m:186a0' % ( + timestamps[1].internal.encode('ascii')), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertEqual(send_map, {}) candidates = [('9d41d8cd98f00b204e9800998ecf0abc', - dict(ts_data=Timestamp(1380144470.00000))), + dict(ts_data=timestamps[0])), ('9d41d8cd98f00b204e9800998ecf0def', - dict(ts_data=Timestamp(1380144472.22222), - ts_meta=Timestamp(1380144473.22222)))] + dict(ts_data=timestamps[1], + ts_meta=timestamps[2]))] self.assertEqual(available_map, dict(candidates)) # nothing logged re: truncation self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_far_end_disconnect(self): + ts = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == POLICIES.legacy and suffixes == ['abc']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -1086,21 +1133,24 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % ( + ts.internal.encode('ascii')), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_far_end_disconnect2(self): + ts = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == POLICIES.legacy and suffixes == ['abc']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -1123,21 +1173,24 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % ( + ts.internal.encode('ascii')), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_far_end_unexpected(self): + ts = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == POLICIES.legacy and suffixes == ['abc']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -1159,21 +1212,24 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'OH HAI'") - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % ( + ts.internal.encode('ascii')), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_send_map(self): + ts = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == POLICIES.legacy and suffixes == ['abc']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -1195,27 +1251,30 @@ class TestSender(BaseTest): self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + b'9d41d8cd98f00b204e9800998ecf0abc %s' % ( + ts.internal.encode('ascii')), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertEqual(send_map, {'0123abc': {'data': True, 'meta': True}}) self.assertEqual(available_map, dict([('9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)})])) + {'ts_data': ts})])) self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_extra_line_parts(self): # check that sender tolerates extra parts in missing check # line responses to allow for protocol upgrades + ts = self.ts() + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == POLICIES.legacy and suffixes == ['abc']): yield ( '9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)}) + {'ts_data': ts}) else: raise Exception( 'No match for %r %r %r %r' % (device, partition, @@ -1240,7 +1299,7 @@ class TestSender(BaseTest): self.assertEqual(send_map, {'0123abc': {'data': True}}) self.assertEqual(available_map, dict([('9d41d8cd98f00b204e9800998ecf0abc', - {'ts_data': Timestamp(1380144470.00000)})])) + {'ts_data': ts})])) self.assertFalse(self.sender.limited_by_max_objects) def test_updates_timeout(self): @@ -1258,10 +1317,10 @@ class TestSender(BaseTest): ':UPDATES: START\r\n' ':UPDATES: END\r\n')) self.sender.updates(connection, response, {}) - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_unexpected_response_lines1(self): connection = FakeConnection() @@ -1276,10 +1335,10 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'abc'") - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_unexpected_response_lines2(self): connection = FakeConnection() @@ -1294,10 +1353,10 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'abc'") - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_is_deleted(self): device = 'dev' @@ -1328,10 +1387,10 @@ class TestSender(BaseTest): self.assertEqual(self.sender.send_put.mock_calls, []) # note that the delete line isn't actually sent since we mock # send_delete; send_delete is tested separately. - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_update_send_delete(self): device = 'dev' @@ -1339,7 +1398,7 @@ class TestSender(BaseTest): object_parts = ('a', 'c', 'o') df = self._make_open_diskfile(device, part, *object_parts) object_hash = utils.hash_path(*object_parts) - delete_timestamp = utils.normalize_timestamp(time.time()) + delete_timestamp = utils.normalize_timestamp(Timestamp.now()) df.delete(delete_timestamp) connection = FakeConnection() self.sender.job = { @@ -1355,15 +1414,12 @@ class TestSender(BaseTest): ':UPDATES: START\r\n' ':UPDATES: END\r\n')) self.sender.updates(connection, response, send_map) - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'30\r\n' - b'DELETE /a/c/o\r\n' - b'X-Timestamp: %s\r\n\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n' - % delete_timestamp.encode('ascii') - ) + expected_messages = [ + b':UPDATES: START', + b'DELETE /a/c/o\r\nX-Timestamp: %s\r\n' % + delete_timestamp.encode('ascii'), + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_put(self): # sender has data file and meta file @@ -1408,10 +1464,10 @@ class TestSender(BaseTest): self.assertEqual(expected, df.get_metadata()) # note that the put line isn't actually sent since we mock send_put; # send_put is tested separately. - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_post(self): ts_iter = make_timestamp_iter() @@ -1455,10 +1511,10 @@ class TestSender(BaseTest): self.assertEqual(expected, df.get_metadata()) # note that the post line isn't actually sent since we mock send_post; # send_post is tested separately. - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_put_and_post(self): ts_iter = make_timestamp_iter() @@ -1507,10 +1563,10 @@ class TestSender(BaseTest): self.assertEqual(path, '/a/c/o') self.assertIsInstance(df, diskfile.DiskFile) self.assertEqual(expected, df.get_metadata()) - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_storage_policy_index(self): device = 'dev' @@ -1570,10 +1626,10 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_read_response_unexp_start(self): connection = FakeConnection() @@ -1588,10 +1644,10 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'anything else'") - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_read_response_timeout_end(self): connection = FakeConnection() @@ -1624,10 +1680,10 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_updates_read_response_unexp_end(self): connection = FakeConnection() @@ -1642,10 +1698,10 @@ class TestSender(BaseTest): except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'anything else'") - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + self.assertConnectionMessages(expected_messages, connection.sent) def test_send_delete_timeout(self): connection = FakeConnection() @@ -1661,14 +1717,12 @@ class TestSender(BaseTest): def test_send_delete(self): connection = FakeConnection() - self.sender.send_delete(connection, '/a/c/o', - utils.Timestamp('1381679759.90941')) - self.assertEqual( - b''.join(connection.sent), - b'30\r\n' - b'DELETE /a/c/o\r\n' - b'X-Timestamp: 1381679759.90941\r\n' - b'\r\n\r\n') + ts = self.ts() + self.sender.send_delete(connection, '/a/c/o', ts) + expected_messages = [ + b'DELETE /a/c/o\r\nX-Timestamp: %s\r\n' + % ts.internal.encode('ascii')] + self.assertConnectionMessages(expected_messages, connection.sent) def test_send_put_initial_timeout(self): df = self._make_open_diskfile() @@ -1707,8 +1761,7 @@ class TestSender(BaseTest): def _check_send_put(self, obj_name, meta_value, meta_name='Unicode-Meta-Name', durable=True): - ts_iter = make_timestamp_iter() - t1 = next(ts_iter) + t1 = self.ts() body = b'test' extra_metadata = {'Some-Other-Header': 'value', meta_name: meta_value} @@ -1720,38 +1773,30 @@ class TestSender(BaseTest): commit=durable) expected = dict(df.get_metadata()) expected['body'] = body.decode('ascii') - expected['chunk_size'] = len(body) expected['meta'] = meta_value expected['meta_name'] = meta_name path = urllib.parse.quote(expected['name']) expected['path'] = path no_commit = '' if durable else 'X-Backend-No-Commit: True\r\n' expected['no_commit'] = no_commit - length = 128 + len(path) + len(meta_value) + len(no_commit) + \ - len(meta_name) - expected['length'] = format(length, 'x') # .meta file metadata is not included in expected for data only PUT - t2 = next(ts_iter) + t2 = self.ts() metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'} df.write_metadata(metadata) df.open() connection = FakeConnection() self.sender.send_put(connection, path, df, durable=durable) - expected = ( - '%(length)s\r\n' - 'PUT %(path)s\r\n' - 'Content-Length: %(Content-Length)s\r\n' - 'ETag: %(ETag)s\r\n' - 'Some-Other-Header: value\r\n' - '%(meta_name)s: %(meta)s\r\n' - '%(no_commit)s' - 'X-Timestamp: %(X-Timestamp)s\r\n' - '\r\n' - '\r\n' - '%(chunk_size)s\r\n' - '%(body)s\r\n' % expected) - expected = wsgi_to_bytes(expected) - self.assertEqual(b''.join(connection.sent), expected) + expected_messages = [ + ('PUT %(path)s\r\n' + 'Content-Length: %(Content-Length)s\r\n' + 'ETag: %(ETag)s\r\n' + 'Some-Other-Header: value\r\n' + '%(meta_name)s: %(meta)s\r\n' + '%(no_commit)s' + 'X-Timestamp: %(X-Timestamp)s\r\n\r\n' % expected), + '%(body)s' % expected] + self.assertConnectionMessages(expected_messages, connection.sent, + cmds_with_newlines=True) def test_send_put(self): self._check_send_put('o', 'meta') @@ -1788,21 +1833,19 @@ class TestSender(BaseTest): df.write_metadata(newer_metadata) path = urllib.parse.quote(df.read_metadata()['name']) wire_meta = wsgi_to_bytes(meta_value) - length = format(61 + len(path) + len(wire_meta), 'x') connection = FakeConnection() with df.open(): self.sender.send_post(connection, path, df) - self.assertEqual( - b''.join(connection.sent), - b'%s\r\n' - b'POST %s\r\n' - b'X-Object-Meta-Foo: %s\r\n' - b'X-Timestamp: %s\r\n' - b'\r\n' - b'\r\n' % (length.encode('ascii'), path.encode('ascii'), - wire_meta, - ts_1.internal.encode('ascii'))) + expected_messages = [ + (b'POST %s\r\n' + b'X-Object-Meta-Foo: %s\r\n' + b'X-Timestamp: %s\r\n' % ( + path.encode('ascii'), + wire_meta, + ts_1.internal.encode('ascii') + ))] + self.assertConnectionMessages(expected_messages, connection.sent) def test_send_post(self): self._check_send_post('o', 'meta') @@ -1828,7 +1871,7 @@ class TestSender(BaseTest): @patch_policies(with_ec_default=True) -class TestSenderEC(BaseTest): +class TestSenderEC(SenderBase): def setUp(self): skip_if_no_xattrs() super(TestSenderEC, self).setUp() @@ -1879,12 +1922,11 @@ class TestSenderEC(BaseTest): connection = FakeConnection() available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n' + object_hash.encode('utf8') + - b' ' + t1.internal.encode('utf8') + b'\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + object_hash.encode('utf8') + b' ' + t1.internal.encode('utf8'), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertEqual( available_map, {object_hash: {'ts_data': t1, 'durable': True}}) @@ -1897,12 +1939,12 @@ class TestSenderEC(BaseTest): connection = FakeConnection() available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'41\r\n' + object_hash.encode('utf8') + - b' ' + t2.internal.encode('utf8') + b' durable:False\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + object_hash.encode('utf8') + b' ' + t2.internal.encode('utf8') + + b' durable:False', + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertEqual( available_map, {object_hash: {'ts_data': t2, 'durable': False}}) @@ -1914,12 +1956,11 @@ class TestSenderEC(BaseTest): connection = FakeConnection() available_map, send_map = self.sender.missing_check(connection, response) - self.assertEqual( - b''.join(connection.sent), - b'17\r\n:MISSING_CHECK: START\r\n\r\n' - b'33\r\n' + object_hash.encode('utf8') + - b' ' + t1.internal.encode('utf8') + b'\r\n\r\n' - b'15\r\n:MISSING_CHECK: END\r\n\r\n') + expected_messages = [ + b':MISSING_CHECK: START', + object_hash.encode('utf8') + b' ' + t1.internal.encode('utf8'), + b':MISSING_CHECK: END'] + self.assertConnectionMessages(expected_messages, connection.sent) self.assertEqual( available_map, {object_hash: {'ts_data': t1, 'durable': True}}) @@ -1981,10 +2022,11 @@ class TestSenderEC(BaseTest): self.assertEqual({'durable': expected_durable_kwarg}, kwargs) # note that the put line isn't actually sent since we mock # send_put; send_put is tested separately. - self.assertEqual( - b''.join(connection.sent), - b'11\r\n:UPDATES: START\r\n\r\n' - b'f\r\n:UPDATES: END\r\n\r\n') + expected_messages = [ + b':UPDATES: START', + b':UPDATES: END'] + + self.assertConnectionMessages(expected_messages, connection.sent) # note: we never expect the (False, False) case check_updates(include_non_durable=False, expected_durable_kwarg=True)