Fix invalid frag_index header in ssync_sender when reverting EC tombstones
Back in d124ce [1] we failed to recognize the situation where a revert job would have an explicit frag_index key wth the literal value None which would take precedence over the dict.get's default value of ''. Later in ssync_receiver we'd bump into the ValueError converting 'None' to an int (again). In ssync_sender we now handle literal None's correctly and should hopefully no longer put this invalid headers on the wire - but for belts and braces we'll also update ssync_receiver to raise a 400 series error and ssync_sender to better log the error messages. 1. https://review.openstack.org/#/c/195457/ Co-Author: Clay Gerrard <clay.gerrard@gmail.com> Co-Author: Alistair Coles <alistair.coles@hp.com> Change-Id: Ic71ba7cc82487773214030207bb193f425319449 Closes-Bug: 1489546
This commit is contained in:
		| @@ -156,13 +156,24 @@ class Receiver(object): | ||||
|         self.request.environ['eventlet.minimum_write_chunk_size'] = 0 | ||||
|         self.device, self.partition, self.policy = \ | ||||
|             request_helpers.get_name_and_placement(self.request, 2, 2, False) | ||||
|  | ||||
|         self.frag_index = self.node_index = None | ||||
|         if self.request.headers.get('X-Backend-Ssync-Frag-Index'): | ||||
|             self.frag_index = int( | ||||
|                 self.request.headers['X-Backend-Ssync-Frag-Index']) | ||||
|             try: | ||||
|                 self.frag_index = int( | ||||
|                     self.request.headers['X-Backend-Ssync-Frag-Index']) | ||||
|             except ValueError: | ||||
|                 raise swob.HTTPBadRequest( | ||||
|                     'Invalid X-Backend-Ssync-Frag-Index %r' % | ||||
|                     self.request.headers['X-Backend-Ssync-Frag-Index']) | ||||
|         if self.request.headers.get('X-Backend-Ssync-Node-Index'): | ||||
|             self.node_index = int( | ||||
|                 self.request.headers['X-Backend-Ssync-Node-Index']) | ||||
|             try: | ||||
|                 self.node_index = int( | ||||
|                     self.request.headers['X-Backend-Ssync-Node-Index']) | ||||
|             except ValueError: | ||||
|                 raise swob.HTTPBadRequest( | ||||
|                     'Invalid X-Backend-Ssync-Node-Index %r' % | ||||
|                     self.request.headers['X-Backend-Ssync-Node-Index']) | ||||
|             if self.node_index != self.frag_index: | ||||
|                 # a primary node should only receive it's own fragments | ||||
|                 raise swob.HTTPBadRequest( | ||||
|   | ||||
| @@ -133,9 +133,16 @@ class Sender(object): | ||||
|             # a sync job must use the node's index for the frag_index of the | ||||
|             # rebuilt fragments instead of the frag_index from the job which | ||||
|             # will be rebuilding them | ||||
|             self.connection.putheader( | ||||
|                 'X-Backend-Ssync-Frag-Index', self.node.get( | ||||
|                     'index', self.job.get('frag_index', ''))) | ||||
|             frag_index = self.node.get('index', self.job.get('frag_index')) | ||||
|             if frag_index is None: | ||||
|                 # replication jobs will not have a frag_index key; | ||||
|                 # reconstructor jobs with only tombstones will have a | ||||
|                 # frag_index key explicitly set to the value of None - in both | ||||
|                 # cases on the wire we write the empty string which | ||||
|                 # ssync_receiver will translate to None | ||||
|                 frag_index = '' | ||||
|             self.connection.putheader('X-Backend-Ssync-Frag-Index', | ||||
|                                       frag_index) | ||||
|             # a revert job to a handoff will not have a node index | ||||
|             self.connection.putheader('X-Backend-Ssync-Node-Index', | ||||
|                                       self.node.get('index', '')) | ||||
| @@ -144,10 +151,10 @@ class Sender(object): | ||||
|                 self.daemon.node_timeout, 'connect receive'): | ||||
|             self.response = self.connection.getresponse() | ||||
|             if self.response.status != http.HTTP_OK: | ||||
|                 self.response.read() | ||||
|                 err_msg = self.response.read()[:1024] | ||||
|                 raise exceptions.ReplicationException( | ||||
|                     'Expected status %s; got %s' % | ||||
|                     (http.HTTP_OK, self.response.status)) | ||||
|                     'Expected status %s; got %s (%s)' % | ||||
|                     (http.HTTP_OK, self.response.status, err_msg)) | ||||
|  | ||||
|     def readline(self): | ||||
|         """ | ||||
|   | ||||
| @@ -14,6 +14,7 @@ | ||||
| # limitations under the License. | ||||
|  | ||||
| import contextlib | ||||
| import hashlib | ||||
| import os | ||||
| import shutil | ||||
| import tempfile | ||||
| @@ -26,7 +27,7 @@ import six | ||||
| from swift.common import bufferedhttp | ||||
| from swift.common import exceptions | ||||
| from swift.common import swob | ||||
| from swift.common.storage_policy import POLICIES | ||||
| from swift.common.storage_policy import POLICIES, REPL_POLICY | ||||
| from swift.common import utils | ||||
| from swift.common.swob import HTTPException | ||||
| from swift.obj import diskfile | ||||
| @@ -255,6 +256,23 @@ class TestReceiver(unittest.TestCase): | ||||
|         self.assertEqual(rcvr.frag_index, 7) | ||||
|         self.assertEqual(rcvr.node_index, 7) | ||||
|  | ||||
|     @unit.patch_policies() | ||||
|     def test_Receiver_with_invalid_indexes(self): | ||||
|         # update router post policy patch | ||||
|         self.controller._diskfile_router = diskfile.DiskFileRouter( | ||||
|             self.conf, self.controller.logger) | ||||
|         req = swob.Request.blank( | ||||
|             '/sda1/1', | ||||
|             environ={'REQUEST_METHOD': 'SSYNC', | ||||
|                      'HTTP_X_BACKEND_SSYNC_NODE_INDEX': 'None', | ||||
|                      'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': 'None', | ||||
|                      'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'}, | ||||
|             body=':MISSING_CHECK: START\r\n' | ||||
|                  ':MISSING_CHECK: END\r\n' | ||||
|                  ':UPDATES: START\r\n:UPDATES: END\r\n') | ||||
|         resp = req.get_response(self.controller) | ||||
|         self.assertEqual(resp.status_int, 400) | ||||
|  | ||||
|     @unit.patch_policies() | ||||
|     def test_Receiver_with_mismatched_indexes(self): | ||||
|         # update router post policy patch | ||||
| @@ -1679,30 +1697,56 @@ class TestSsyncRxServer(unittest.TestCase): | ||||
|     # server socket. | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.ts = unit.make_timestamp_iter() | ||||
|         self.rx_ip = '127.0.0.1' | ||||
|         # dirs | ||||
|         self.tmpdir = tempfile.mkdtemp() | ||||
|         self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server') | ||||
|  | ||||
|         self.devices = os.path.join(self.tempdir, 'srv/node') | ||||
|         self.rx_devices = os.path.join(self.tempdir, 'rx/node') | ||||
|         self.tx_devices = os.path.join(self.tempdir, 'tx/node') | ||||
|         for device in ('sda1', 'sdb1'): | ||||
|             os.makedirs(os.path.join(self.devices, device)) | ||||
|             for root in (self.rx_devices, self.tx_devices): | ||||
|                 os.makedirs(os.path.join(root, device)) | ||||
|  | ||||
|         self.conf = { | ||||
|             'devices': self.devices, | ||||
|             'devices': self.rx_devices, | ||||
|             'swift_dir': self.tempdir, | ||||
|             'mount_check': False, | ||||
|         } | ||||
|         self.rx_logger = debug_logger('test-object-server') | ||||
|         rx_server = server.ObjectController(self.conf, logger=self.rx_logger) | ||||
|         sock = eventlet.listen((self.rx_ip, 0)) | ||||
|         self.rx_app = server.ObjectController(self.conf, logger=self.rx_logger) | ||||
|         self.sock = eventlet.listen((self.rx_ip, 0)) | ||||
|         self.rx_server = eventlet.spawn( | ||||
|             eventlet.wsgi.server, sock, rx_server, utils.NullLogger()) | ||||
|         self.rx_port = sock.getsockname()[1] | ||||
|         self.tx_logger = debug_logger('test-reconstructor') | ||||
|             eventlet.wsgi.server, self.sock, self.rx_app, utils.NullLogger()) | ||||
|         self.rx_port = self.sock.getsockname()[1] | ||||
|         self.tx_logger = debug_logger('test-daemon') | ||||
|         self.policy = POLICIES[0] | ||||
|         self.conf['devices'] = self.tx_devices | ||||
|         self.daemon = ObjectReconstructor(self.conf, self.tx_logger) | ||||
|         self.daemon._diskfile_mgr = self.daemon._df_router[POLICIES[0]] | ||||
|         self.daemon._diskfile_mgr = self.daemon._df_router[self.policy] | ||||
|  | ||||
|         self.nodes = [ | ||||
|             { | ||||
|                 'device': 'sda1', | ||||
|                 'ip': '127.0.0.1', | ||||
|                 'replication_ip': '127.0.0.1', | ||||
|                 'port': self.rx_port, | ||||
|                 'replication_port': self.rx_port, | ||||
|             }, | ||||
|             { | ||||
|                 'device': 'sdb1', | ||||
|                 'ip': '127.0.0.1', | ||||
|                 'replication_ip': '127.0.0.1', | ||||
|                 'port': self.rx_port, | ||||
|                 'replication_port': self.rx_port, | ||||
|             }, | ||||
|         ] | ||||
|  | ||||
|     def tearDown(self): | ||||
|         self.rx_server.kill() | ||||
|         self.sock.close() | ||||
|         eventlet.sleep(0) | ||||
|         shutil.rmtree(self.tmpdir) | ||||
|  | ||||
|     def test_SSYNC_disconnect(self): | ||||
| @@ -1770,6 +1814,107 @@ class TestSsyncRxServer(unittest.TestCase): | ||||
|         # sanity check that the receiver did not proceed to missing_check | ||||
|         self.assertFalse(mock_missing_check.called) | ||||
|  | ||||
|     def test_sender_job_missing_frag_node_indexes(self): | ||||
|         # replication jobs don't send frag_index, so we'll use a REPL_POLICY | ||||
|         repl_policy = POLICIES[1] | ||||
|         self.assertEqual(repl_policy.policy_type, REPL_POLICY) | ||||
|         repl_mgr = self.daemon._df_router[repl_policy] | ||||
|         self.daemon._diskfile_mgr = repl_mgr | ||||
|         device = self.nodes[0]['device'] | ||||
|         # create a replicated object, on sender | ||||
|         df = repl_mgr.get_diskfile(device, '0', 'a', 'c', 'o', | ||||
|                                    policy=repl_policy) | ||||
|         now = next(self.ts) | ||||
|         metadata = { | ||||
|             'X-Timestamp': now.internal, | ||||
|             'Content-Type': 'text/plain', | ||||
|             'Content-Length': '0', | ||||
|             'ETag': hashlib.md5('').hexdigest(), | ||||
|         } | ||||
|         with df.create() as writer: | ||||
|             writer.write('') | ||||
|             writer.put(metadata) | ||||
|         # sanity the object is on the sender | ||||
|         self.assertTrue(df._datadir.startswith(self.tx_devices)) | ||||
|         # setup a ssync job | ||||
|         suffix = os.path.basename(os.path.dirname(df._datadir)) | ||||
|         job = { | ||||
|             'partition': 0, | ||||
|             'policy': repl_policy, | ||||
|             'device': device, | ||||
|         } | ||||
|         sender = ssync_sender.Sender( | ||||
|             self.daemon, self.nodes[0], job, [suffix]) | ||||
|         success, _ = sender() | ||||
|         self.assertTrue(success) | ||||
|         # sanity object is synced to receiver | ||||
|         remote_df = self.rx_app._diskfile_router[repl_policy].get_diskfile( | ||||
|             device, '0', 'a', 'c', 'o', policy=repl_policy) | ||||
|         self.assertTrue(remote_df._datadir.startswith(self.rx_devices)) | ||||
|         self.assertEqual(remote_df.read_metadata(), metadata) | ||||
|  | ||||
|     def test_send_frag_index_none(self): | ||||
|         # create an ec fragment on the remote node | ||||
|         device = self.nodes[1]['device'] | ||||
|         remote_df = self.rx_app._diskfile_router[self.policy].get_diskfile( | ||||
|             device, '1', 'a', 'c', 'o', policy=self.policy) | ||||
|         ts1 = next(self.ts) | ||||
|         data = 'frag_archive' | ||||
|         metadata = { | ||||
|             'ETag': hashlib.md5(data).hexdigest(), | ||||
|             'X-Timestamp': ts1.internal, | ||||
|             'Content-Length': len(data), | ||||
|             'X-Object-Sysmeta-Ec-Frag-Index': '3', | ||||
|         } | ||||
|         with remote_df.create() as writer: | ||||
|             writer.write(data) | ||||
|             writer.put(metadata) | ||||
|             writer.commit(ts1) | ||||
|         # create a tombstone on the local node | ||||
|         df = self.daemon._df_router[self.policy].get_diskfile( | ||||
|             device, '1', 'a', 'c', 'o', policy=self.policy) | ||||
|         suffix = os.path.basename(os.path.dirname(df._datadir)) | ||||
|         ts2 = next(self.ts) | ||||
|         df.delete(ts2) | ||||
|         # a reconstructor revert job with only tombstones will have frag_index | ||||
|         # explicitly set to None | ||||
|         job = { | ||||
|             'frag_index': None, | ||||
|             'partition': 1, | ||||
|             'policy': self.policy, | ||||
|             'device': device, | ||||
|         } | ||||
|         sender = ssync_sender.Sender( | ||||
|             self.daemon, self.nodes[1], job, [suffix]) | ||||
|         success, _ = sender() | ||||
|         self.assertTrue(success) | ||||
|         # diskfile tombstone synced to receiver's datadir with timestamp | ||||
|         self.assertTrue(remote_df._datadir.startswith(self.rx_devices)) | ||||
|         try: | ||||
|             remote_df.read_metadata() | ||||
|         except exceptions.DiskFileDeleted as e: | ||||
|             self.assertEqual(e.timestamp, ts2) | ||||
|         else: | ||||
|             self.fail('Successfully opened remote DiskFile') | ||||
|  | ||||
|     def test_bad_request_invalid_frag_index(self): | ||||
|         with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\ | ||||
|                 as mock_missing_check: | ||||
|             self.connection = bufferedhttp.BufferedHTTPConnection( | ||||
|                 '127.0.0.1:%s' % self.rx_port) | ||||
|             self.connection.putrequest('SSYNC', '/sda1/0') | ||||
|             self.connection.putheader('Transfer-Encoding', 'chunked') | ||||
|             self.connection.putheader('X-Backend-Ssync-Frag-Index', | ||||
|                                       'None') | ||||
|             self.connection.endheaders() | ||||
|             resp = self.connection.getresponse() | ||||
|         self.assertEqual(400, resp.status) | ||||
|         error_msg = resp.read() | ||||
|         self.assertIn("Invalid X-Backend-Ssync-Frag-Index 'None'", error_msg) | ||||
|         resp.close() | ||||
|         # sanity check that the receiver did not proceed to missing_check | ||||
|         self.assertFalse(mock_missing_check.called) | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     unittest.main() | ||||
|   | ||||
| @@ -312,6 +312,74 @@ class TestSender(BaseTestSender): | ||||
|                                  method_name, mock_method.mock_calls, | ||||
|                                  expected_calls)) | ||||
|  | ||||
|     def test_connect_handoff_no_frag(self): | ||||
|         node = dict(replication_ip='1.2.3.4', replication_port=5678, | ||||
|                     device='sda1') | ||||
|         job = dict(partition='9', policy=POLICIES[0]) | ||||
|         self.sender = ssync_sender.Sender(self.daemon, node, job, None) | ||||
|         self.sender.suffixes = ['abc'] | ||||
|         with mock.patch( | ||||
|                 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' | ||||
|         ) as mock_conn_class: | ||||
|             mock_conn = mock_conn_class.return_value | ||||
|             mock_resp = mock.MagicMock() | ||||
|             mock_resp.status = 200 | ||||
|             mock_conn.getresponse.return_value = mock_resp | ||||
|             self.sender.connect() | ||||
|         mock_conn_class.assert_called_once_with('1.2.3.4:5678') | ||||
|         expectations = { | ||||
|             'putrequest': [ | ||||
|                 mock.call('SSYNC', '/sda1/9'), | ||||
|             ], | ||||
|             'putheader': [ | ||||
|                 mock.call('Transfer-Encoding', 'chunked'), | ||||
|                 mock.call('X-Backend-Storage-Policy-Index', 0), | ||||
|                 mock.call('X-Backend-Ssync-Frag-Index', ''), | ||||
|                 mock.call('X-Backend-Ssync-Node-Index', ''), | ||||
|             ], | ||||
|             'endheaders': [mock.call()], | ||||
|         } | ||||
|         for method_name, expected_calls in expectations.items(): | ||||
|             mock_method = getattr(mock_conn, method_name) | ||||
|             self.assertEqual(expected_calls, mock_method.mock_calls, | ||||
|                              'connection method "%s" got %r not %r' % ( | ||||
|                                  method_name, mock_method.mock_calls, | ||||
|                                  expected_calls)) | ||||
|  | ||||
|     def test_connect_handoff_none_frag(self): | ||||
|         node = dict(replication_ip='1.2.3.4', replication_port=5678, | ||||
|                     device='sda1') | ||||
|         job = dict(partition='9', policy=POLICIES[1], frag_index=None) | ||||
|         self.sender = ssync_sender.Sender(self.daemon, node, job, None) | ||||
|         self.sender.suffixes = ['abc'] | ||||
|         with mock.patch( | ||||
|                 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' | ||||
|         ) as mock_conn_class: | ||||
|             mock_conn = mock_conn_class.return_value | ||||
|             mock_resp = mock.MagicMock() | ||||
|             mock_resp.status = 200 | ||||
|             mock_conn.getresponse.return_value = mock_resp | ||||
|             self.sender.connect() | ||||
|         mock_conn_class.assert_called_once_with('1.2.3.4:5678') | ||||
|         expectations = { | ||||
|             'putrequest': [ | ||||
|                 mock.call('SSYNC', '/sda1/9'), | ||||
|             ], | ||||
|             'putheader': [ | ||||
|                 mock.call('Transfer-Encoding', 'chunked'), | ||||
|                 mock.call('X-Backend-Storage-Policy-Index', 1), | ||||
|                 mock.call('X-Backend-Ssync-Frag-Index', ''), | ||||
|                 mock.call('X-Backend-Ssync-Node-Index', ''), | ||||
|             ], | ||||
|             'endheaders': [mock.call()], | ||||
|         } | ||||
|         for method_name, expected_calls in expectations.items(): | ||||
|             mock_method = getattr(mock_conn, method_name) | ||||
|             self.assertEqual(expected_calls, mock_method.mock_calls, | ||||
|                              'connection method "%s" got %r not %r' % ( | ||||
|                                  method_name, mock_method.mock_calls, | ||||
|                                  expected_calls)) | ||||
|  | ||||
|     def test_connect_handoff_replicated(self): | ||||
|         node = dict(replication_ip='1.2.3.4', replication_port=5678, | ||||
|                     device='sda1') | ||||
| @@ -523,6 +591,7 @@ class TestSender(BaseTestSender): | ||||
|         self.assertEqual(candidates, {}) | ||||
|  | ||||
|     def test_connect_send_timeout(self): | ||||
|         self.daemon.node_timeout = 0.01  # make disconnect fail fast | ||||
|         self.daemon.conn_timeout = 0.01 | ||||
|         node = dict(replication_ip='1.2.3.4', replication_port=5678, | ||||
|                     device='sda1') | ||||
| @@ -578,6 +647,7 @@ class TestSender(BaseTestSender): | ||||
|             def getresponse(*args, **kwargs): | ||||
|                 response = FakeResponse() | ||||
|                 response.status = 503 | ||||
|                 response.read = lambda: 'an error message' | ||||
|                 return response | ||||
|  | ||||
|         missing_check_fn = 'swift.obj.ssync_sender.Sender.missing_check' | ||||
| @@ -594,6 +664,7 @@ class TestSender(BaseTestSender): | ||||
|         for line in error_lines: | ||||
|             self.assertTrue(line.startswith( | ||||
|                 '1.2.3.4:5678/sda1/9 Expected status 200; got 503')) | ||||
|             self.assertIn('an error message', line) | ||||
|         # sanity check that Sender did not proceed to missing_check exchange | ||||
|         self.assertFalse(mock_missing_check.called) | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 paul luse
					paul luse