diff --git a/swift/obj/server.py b/swift/obj/server.py index ac3c7f39e5..e1d047d265 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -250,7 +250,8 @@ class ObjectController(BaseStorageServer): {'ip': ip, 'port': port, 'dev': contdevice}) data = {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out} - timestamp = headers_out['x-timestamp'] + timestamp = headers_out.get('x-meta-timestamp', + headers_out.get('x-timestamp')) self._diskfile_router[policy].pickle_async_update( objdevice, account, container, obj, data, timestamp, policy) @@ -565,6 +566,7 @@ class ObjectController(BaseStorageServer): content_type_headers['Content-Type'] += (';swift_bytes=%s' % swift_bytes) + # object POST updates are PUT to the container server self.container_update( 'PUT', account, container, obj, request, HeaderKeyDict({ diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 40c37ee39c..bb296a07e6 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -48,6 +48,7 @@ from test.unit import FakeLogger, debug_logger, mocked_http_conn, \ make_timestamp_iter, DEFAULT_TEST_EC_TYPE from test.unit import connect_tcp, readuntil2crlfs, patch_policies from swift.obj import server as object_server +from swift.obj import updater from swift.obj import diskfile from swift.common import utils, bufferedhttp from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ @@ -697,6 +698,150 @@ class TestObjectController(unittest.TestCase): self._test_POST_container_updates( POLICIES[1], update_etag='override_etag') + def _test_PUT_then_POST_async_pendings(self, policy, update_etag=None): + # Test that PUT and POST requests result in distinct async pending + # files when sync container update fails. + def fake_http_connect(*args): + raise Exception('test') + + device_dir = os.path.join(self.testdir, 'sda1') + ts_iter = make_timestamp_iter() + t_put = ts_iter.next() + update_etag = update_etag or '098f6bcd4621d373cade4e832627b4f6' + + put_headers = { + 'X-Trans-Id': 'put_trans_id', + 'X-Timestamp': t_put.internal, + 'Content-Type': 'application/octet-stream;swift_bytes=123456789', + 'Content-Length': '4', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Container-Host': 'chost:cport', + 'X-Container-Partition': 'cpartition', + 'X-Container-Device': 'cdevice'} + if policy.policy_type == EC_POLICY: + put_headers.update({ + 'X-Object-Sysmeta-Ec-Frag-Index': '2', + 'X-Backend-Container-Update-Override-Etag': update_etag, + 'X-Object-Sysmeta-Ec-Etag': update_etag}) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers=put_headers, body='test') + + with mock.patch('swift.obj.server.http_connect', fake_http_connect): + with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''): + resp = req.get_response(self.object_controller) + + self.assertEqual(resp.status_int, 201) + + async_pending_file_put = os.path.join( + device_dir, diskfile.get_async_dir(policy), 'a83', + '06fbf0b514e5199dfc4e00f42eb5ea83-%s' % t_put.internal) + self.assertTrue(os.path.isfile(async_pending_file_put), + 'Expected %s to be a file but it is not.' + % async_pending_file_put) + expected_put_headers = { + 'Referer': 'PUT http://localhost/sda1/p/a/c/o', + 'X-Trans-Id': 'put_trans_id', + 'X-Timestamp': t_put.internal, + 'X-Content-Type': 'application/octet-stream;swift_bytes=123456789', + 'X-Size': '4', + 'X-Etag': '098f6bcd4621d373cade4e832627b4f6', + 'User-Agent': 'object-server %s' % os.getpid(), + 'X-Backend-Storage-Policy-Index': '%d' % int(policy)} + if policy.policy_type == EC_POLICY: + expected_put_headers['X-Etag'] = update_etag + self.assertDictEqual( + pickle.load(open(async_pending_file_put)), + {'headers': expected_put_headers, + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + + # POST with newer metadata returns success and container update + # is expected + t_post = ts_iter.next() + post_headers = { + 'X-Trans-Id': 'post_trans_id', + 'X-Timestamp': t_post.internal, + 'Content-Type': 'application/other', + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Container-Host': 'chost:cport', + 'X-Container-Partition': 'cpartition', + 'X-Container-Device': 'cdevice'} + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, + headers=post_headers) + + with mock.patch('swift.obj.server.http_connect', fake_http_connect): + with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''): + resp = req.get_response(self.object_controller) + + self.assertEqual(resp.status_int, 202) + + self.maxDiff = None + # check async pending file for PUT is still intact + self.assertDictEqual( + pickle.load(open(async_pending_file_put)), + {'headers': expected_put_headers, + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + + # check distinct async pending file for POST + async_pending_file_post = os.path.join( + device_dir, diskfile.get_async_dir(policy), 'a83', + '06fbf0b514e5199dfc4e00f42eb5ea83-%s' % t_post.internal) + self.assertTrue(os.path.isfile(async_pending_file_post), + 'Expected %s to be a file but it is not.' + % async_pending_file_post) + expected_post_headers = { + 'Referer': 'POST http://localhost/sda1/p/a/c/o', + 'X-Trans-Id': 'post_trans_id', + 'X-Timestamp': t_put.internal, + 'X-Content-Type': 'application/other;swift_bytes=123456789', + 'X-Size': '4', + 'X-Etag': '098f6bcd4621d373cade4e832627b4f6', + 'User-Agent': 'object-server %s' % os.getpid(), + 'X-Backend-Storage-Policy-Index': '%d' % int(policy), + 'X-Meta-Timestamp': t_post.internal, + 'X-Content-Type-Timestamp': t_post.internal, + } + if policy.policy_type == EC_POLICY: + expected_post_headers['X-Etag'] = update_etag + self.assertDictEqual( + pickle.load(open(async_pending_file_post)), + {'headers': expected_post_headers, + 'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'}) + + # verify that only the POST (most recent) async update gets sent by the + # object updater, and that both update files are deleted + with mock.patch( + 'swift.obj.updater.ObjectUpdater.object_update') as mock_update, \ + mock.patch('swift.obj.updater.dump_recon_cache'): + object_updater = updater.ObjectUpdater( + {'devices': self.testdir, + 'mount_check': 'false'}, logger=debug_logger()) + node = {'id': 1} + mock_ring = mock.MagicMock() + mock_ring.get_nodes.return_value = (99, [node]) + object_updater.container_ring = mock_ring + mock_update.return_value = ((True, 1)) + object_updater.run_once() + self.assertEqual(1, mock_update.call_count) + self.assertEqual((node, 99, 'PUT', '/a/c/o'), + mock_update.call_args_list[0][0][0:4]) + actual_headers = mock_update.call_args_list[0][0][4] + self.assertTrue( + actual_headers.pop('user-agent').startswith('object-updater')) + self.assertDictEqual(expected_post_headers, actual_headers) + self.assertFalse( + os.listdir(os.path.join( + device_dir, diskfile.get_async_dir(policy)))) + + def test_PUT_then_POST_async_updates_with_repl_policy(self): + self._test_PUT_then_POST_async_pendings(POLICIES[0]) + + def test_PUT_then_POST_async_updates_with_EC_policy(self): + self._test_PUT_then_POST_async_pendings( + POLICIES[1], update_etag='override_etag') + def test_POST_quarantine_zbyte(self): timestamp = normalize_timestamp(time()) req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},