From 9db7391e55e069d82f780c4372ffa32ef4e79c35 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Mon, 7 Mar 2016 18:42:01 +0000 Subject: [PATCH] Object POST update gets distinct async pending file Each object update to a container server is saved in a pending file if the initial update attempt fails. Pending file names were derived from the update request's x-timestamp, which is equal to the object's data file timestamp. This meant that updates due to an object POST used the same async pending file as updates due to the object's PUT. This is not so bad because the object POST update has a superset of the metadata included in the PUT update. But there is a risk of a race condition causing an update to be lost: the updater may open an update file due to a PUT whuile the object server is writing an update due to a POST to the same file name. The updater could then unlink the file before the more recent update for the POST is sent. This patch changes the POST update pending file name to be derived from the object's metadata timestamp, thus making it distinct from the PUT update pending file name. There is no upgrade impact since existing pending files will continue to be processed. Change-Id: I1b093c837efe8c2a64e92075ebd5e1b93e30efb9 --- swift/obj/server.py | 4 +- test/unit/obj/test_server.py | 145 +++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/swift/obj/server.py b/swift/obj/server.py index ac3c7f39e5..e1d047d265 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -250,7 +250,8 @@ class ObjectController(BaseStorageServer): {'ip': ip, 'port': port, 'dev': contdevice}) data = {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out} - timestamp = headers_out['x-timestamp'] + timestamp = headers_out.get('x-meta-timestamp', + headers_out.get('x-timestamp')) self._diskfile_router[policy].pickle_async_update( objdevice, account, container, obj, data, timestamp, policy) @@ -565,6 +566,7 @@ class ObjectController(BaseStorageServer): content_type_headers['Content-Type'] += (';swift_bytes=%s' % swift_bytes) + # object POST updates are PUT to the container server self.container_update( 'PUT', account, container, obj, request, HeaderKeyDict({ diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 40c37ee39c..bb296a07e6 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -48,6 +48,7 @@ from test.unit import FakeLogger, debug_logger, mocked_http_conn, \ make_timestamp_iter, DEFAULT_TEST_EC_TYPE from test.unit import connect_tcp, readuntil2crlfs, patch_policies from swift.obj import server as object_server +from swift.obj import updater from swift.obj import diskfile from swift.common import utils, bufferedhttp from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ @@ -697,6 +698,150 @@ class TestObjectController(unittest.TestCase): self._test_POST_container_updates( POLICIES[1], update_etag='override_etag') + def _test_PUT_then_POST_async_pendings(self, policy, update_etag=None): + # Test that PUT and POST requests result in distinct async pending + # files when sync container update fails. + def fake_http_connect(*args): + raise Exception('test') + + device_dir = os.path.join(self.testdir, 'sda1') + ts_iter = make_timestamp_iter() + t_put = ts_iter.next() + update_etag = update_etag or '098f6bcd4621d373cade4e832627b4f6' + + put_headers = { + 'X-Trans-Id': 'put_trans_id', + 'X-Timestamp': t_put.internal, + 'Content-Type': 'application/octet-stream;swift_bytes=123456789', + 'Content-Length': '4', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Container-Host': 'chost:cport', + 'X-Container-Partition': 'cpartition', + 'X-Container-Device': 'cdevice'} + if policy.policy_type == EC_POLICY: + put_headers.update({ + 'X-Object-Sysmeta-Ec-Frag-Index': '2', + 'X-Backend-Container-Update-Override-Etag': update_etag, + 'X-Object-Sysmeta-Ec-Etag': update_etag}) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers=put_headers, body='test') + + with mock.patch('swift.obj.server.http_connect', fake_http_connect): + with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''): + resp = req.get_response(self.object_controller) + + self.assertEqual(resp.status_int, 201) + + async_pending_file_put = os.path.join( + device_dir, diskfile.get_async_dir(policy), 'a83', + '06fbf0b514e5199dfc4e00f42eb5ea83-%s' % t_put.internal) + self.assertTrue(os.path.isfile(async_pending_file_put), + 'Expected %s to be a file but it is not.' + % async_pending_file_put) + expected_put_headers = { + 'Referer': 'PUT http://localhost/sda1/p/a/c/o', + 'X-Trans-Id': 'put_trans_id', + 'X-Timestamp': t_put.internal, + 'X-Content-Type': 'application/octet-stream;swift_bytes=123456789', + 'X-Size': '4', + 'X-Etag': '098f6bcd4621d373cade4e832627b4f6', + 'User-Agent': 'object-server %s' % os.getpid(), + 'X-Backend-Storage-Policy-Index': '%d' % int(policy)} + if policy.policy_type == EC_POLICY: + expected_put_headers['X-Etag'] = update_etag + self.assertDictEqual( + pickle.load(open(async_pending_file_put)), + {'headers': expected_put_headers, + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + + # POST with newer metadata returns success and container update + # is expected + t_post = ts_iter.next() + post_headers = { + 'X-Trans-Id': 'post_trans_id', + 'X-Timestamp': t_post.internal, + 'Content-Type': 'application/other', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Container-Host': 'chost:cport', + 'X-Container-Partition': 'cpartition', + 'X-Container-Device': 'cdevice'} + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, + headers=post_headers) + + with mock.patch('swift.obj.server.http_connect', fake_http_connect): + with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''): + resp = req.get_response(self.object_controller) + + self.assertEqual(resp.status_int, 202) + + self.maxDiff = None + # check async pending file for PUT is still intact + self.assertDictEqual( + pickle.load(open(async_pending_file_put)), + {'headers': expected_put_headers, + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + + # check distinct async pending file for POST + async_pending_file_post = os.path.join( + device_dir, diskfile.get_async_dir(policy), 'a83', + '06fbf0b514e5199dfc4e00f42eb5ea83-%s' % t_post.internal) + self.assertTrue(os.path.isfile(async_pending_file_post), + 'Expected %s to be a file but it is not.' + % async_pending_file_post) + expected_post_headers = { + 'Referer': 'POST http://localhost/sda1/p/a/c/o', + 'X-Trans-Id': 'post_trans_id', + 'X-Timestamp': t_put.internal, + 'X-Content-Type': 'application/other;swift_bytes=123456789', + 'X-Size': '4', + 'X-Etag': '098f6bcd4621d373cade4e832627b4f6', + 'User-Agent': 'object-server %s' % os.getpid(), + 'X-Backend-Storage-Policy-Index': '%d' % int(policy), + 'X-Meta-Timestamp': t_post.internal, + 'X-Content-Type-Timestamp': t_post.internal, + } + if policy.policy_type == EC_POLICY: + expected_post_headers['X-Etag'] = update_etag + self.assertDictEqual( + pickle.load(open(async_pending_file_post)), + {'headers': expected_post_headers, + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + + # verify that only the POST (most recent) async update gets sent by the + # object updater, and that both update files are deleted + with mock.patch( + 'swift.obj.updater.ObjectUpdater.object_update') as mock_update, \ + mock.patch('swift.obj.updater.dump_recon_cache'): + object_updater = updater.ObjectUpdater( + {'devices': self.testdir, + 'mount_check': 'false'}, logger=debug_logger()) + node = {'id': 1} + mock_ring = mock.MagicMock() + mock_ring.get_nodes.return_value = (99, [node]) + object_updater.container_ring = mock_ring + mock_update.return_value = ((True, 1)) + object_updater.run_once() + self.assertEqual(1, mock_update.call_count) + self.assertEqual((node, 99, 'PUT', '/a/c/o'), + mock_update.call_args_list[0][0][0:4]) + actual_headers = mock_update.call_args_list[0][0][4] + self.assertTrue( + actual_headers.pop('user-agent').startswith('object-updater')) + self.assertDictEqual(expected_post_headers, actual_headers) + self.assertFalse( + os.listdir(os.path.join( + device_dir, diskfile.get_async_dir(policy)))) + + def test_PUT_then_POST_async_updates_with_repl_policy(self): + self._test_PUT_then_POST_async_pendings(POLICIES[0]) + + def test_PUT_then_POST_async_updates_with_EC_policy(self): + self._test_PUT_then_POST_async_pendings( + POLICIES[1], update_etag='override_etag') + def test_POST_quarantine_zbyte(self): timestamp = normalize_timestamp(time()) req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},