diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 77e931670a..78b2e95aa8 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -196,6 +196,9 @@ class BaseObjectControllerMixin(object): policy = policy or POLICIES.default return policy.quorum + +class CommonObjectControllerMixin(BaseObjectControllerMixin): + # defines tests that are common to all storage policy types def test_iter_nodes_local_first_noops_when_no_affinity(self): # this test needs a stable node order - most don't self.app.sort_nodes = lambda l, *args, **kwargs: l @@ -563,8 +566,10 @@ class BaseObjectControllerMixin(object): } with set_http_connect(*codes, give_connect=capture_headers, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) # sanity check + # this req may or may not succeed depending on the Putter type used + # but that's ok because we're only interested in verifying the + # headers that were sent + req.get_response(self.app) counts = {True: 0, False: 0, None: 0} for headers in captured_headers: @@ -1003,11 +1008,11 @@ class BaseObjectControllerMixin(object): self._check_write_affinity(conf, policy_conf, POLICIES[1], [0], 3 * self.replicas(POLICIES[1])) -# end of BaseObjectControllerMixin +# end of CommonObjectControllerMixin @patch_policies() -class TestReplicatedObjController(BaseObjectControllerMixin, +class TestReplicatedObjController(CommonObjectControllerMixin, unittest.TestCase): controller_cls = obj.ReplicatedObjectController @@ -1110,105 +1115,6 @@ class TestReplicatedObjController(BaseObjectControllerMixin, def test_PUT_with_no_body_and_no_footers(self): self._test_PUT_with_no_footers(test_body='', chunked=False) - def _test_PUT_with_footers(self, test_body=''): - # verify that when footers are required the PUT body is multipart - # and the footers are appended - footers_callback = make_footers_callback(test_body) - env = {'swift.callback.update_footers': footers_callback} - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - environ=env) - req.body = test_body - # send bogus Etag header to differentiate from footer value - req.headers['Etag'] = 'header_etag' - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes' - } - - put_requests = defaultdict( - lambda: {'headers': None, 'chunks': [], 'connection': None}) - - def capture_body(conn, chunk): - put_requests[conn.connection_id]['chunks'].append(chunk) - put_requests[conn.connection_id]['connection'] = conn - - def capture_headers(ip, port, device, part, method, path, headers, - **kwargs): - conn_id = kwargs['connection_id'] - put_requests[conn_id]['headers'] = headers - - resp_headers = { - 'Etag': '"resp_etag"', - # NB: ignored! - 'Some-Header': 'Four', - } - with set_http_connect(*codes, expect_headers=expect_headers, - give_send=capture_body, - give_connect=capture_headers, - headers=resp_headers): - resp = req.get_response(self.app) - - self.assertEqual(resp.status_int, 201) - timestamps = {captured_req['headers']['x-timestamp'] - for captured_req in put_requests.values()} - self.assertEqual(1, len(timestamps), timestamps) - self.assertEqual(dict(resp.headers), { - 'Content-Type': 'text/html; charset=UTF-8', - 'Content-Length': '0', - 'Etag': 'resp_etag', - 'Last-Modified': time.strftime( - "%a, %d %b %Y %H:%M:%S GMT", - time.gmtime(math.ceil(float(timestamps.pop())))), - }) - for connection_id, info in put_requests.items(): - body = unchunk_body(''.join(info['chunks'])) - headers = info['headers'] - boundary = headers['X-Backend-Obj-Multipart-Mime-Boundary'] - self.assertTrue(boundary is not None, - "didn't get boundary for conn %r" % ( - connection_id,)) - self.assertEqual('chunked', headers['Transfer-Encoding']) - self.assertEqual('100-continue', headers['Expect']) - self.assertEqual('yes', headers['X-Backend-Obj-Metadata-Footer']) - self.assertNotIn('X-Backend-Obj-Multiphase-Commit', headers) - self.assertEqual('header_etag', headers['Etag']) - - # email.parser.FeedParser doesn't know how to take a multipart - # message and boundary together and parse it; it only knows how - # to take a string, parse the headers, and figure out the - # boundary on its own. - parser = email.parser.FeedParser() - parser.feed( - "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" % - boundary) - parser.feed(body) - message = parser.close() - - self.assertTrue(message.is_multipart()) # sanity check - mime_parts = message.get_payload() - # notice, no commit confirmation - self.assertEqual(len(mime_parts), 2) - obj_part, footer_part = mime_parts - - self.assertEqual(obj_part['X-Document'], 'object body') - self.assertEqual(test_body, obj_part.get_payload()) - - # validate footer metadata - self.assertEqual(footer_part['X-Document'], 'object metadata') - footer_metadata = json.loads(footer_part.get_payload()) - self.assertTrue(footer_metadata) - expected = {} - footers_callback(expected) - self.assertDictEqual(expected, footer_metadata) - - self.assertTrue(info['connection'].closed) - - def test_PUT_with_body_and_footers(self): - self._test_PUT_with_footers(test_body='asdf') - - def test_PUT_with_no_body_and_footers(self): - self._test_PUT_with_footers() - def test_txn_id_logging_on_PUT(self): req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') self.app.logger.txn_id = req.environ['swift.trans_id'] = 'test-txn-id' @@ -1872,11 +1778,129 @@ class TestReplicatedObjController(BaseObjectControllerMixin, StoragePolicy(3, '15-replica', False)], fake_ring_args=[ {'replicas': 1}, {'replicas': 5}, {'replicas': 8}, {'replicas': 15}]) -class TestReplicatedObjControllerVariousReplicas(BaseObjectControllerMixin, +class TestReplicatedObjControllerVariousReplicas(CommonObjectControllerMixin, unittest.TestCase): controller_cls = obj.ReplicatedObjectController +@patch_policies() +class TestReplicatedObjControllerMimePutter(BaseObjectControllerMixin, + unittest.TestCase): + # tests specific to PUTs using a MimePutter + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes' + } + + def setUp(self): + super(TestReplicatedObjControllerMimePutter, self).setUp() + # force use of a MimePutter + self.app.use_put_v1 = False + + def test_PUT_error(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [503] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def _test_PUT_with_footers(self, test_body=''): + # verify that when footers are required the PUT body is multipart + # and the footers are appended + footers_callback = make_footers_callback(test_body) + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + environ=env) + req.body = test_body + # send bogus Etag header to differentiate from footer value + req.headers['Etag'] = 'header_etag' + codes = [201] * self.replicas() + + put_requests = defaultdict( + lambda: {'headers': None, 'chunks': [], 'connection': None}) + + def capture_body(conn, chunk): + put_requests[conn.connection_id]['chunks'].append(chunk) + put_requests[conn.connection_id]['connection'] = conn + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + conn_id = kwargs['connection_id'] + put_requests[conn_id]['headers'] = headers + + resp_headers = { + 'Etag': '"resp_etag"', + # NB: ignored! + 'Some-Header': 'Four', + } + with set_http_connect(*codes, expect_headers=self.expect_headers, + give_send=capture_body, + give_connect=capture_headers, + headers=resp_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 201) + timestamps = {captured_req['headers']['x-timestamp'] + for captured_req in put_requests.values()} + self.assertEqual(1, len(timestamps), timestamps) + self.assertEqual(dict(resp.headers), { + 'Content-Type': 'text/html; charset=UTF-8', + 'Content-Length': '0', + 'Etag': 'resp_etag', + 'Last-Modified': time.strftime( + "%a, %d %b %Y %H:%M:%S GMT", + time.gmtime(math.ceil(float(timestamps.pop())))), + }) + for connection_id, info in put_requests.items(): + body = unchunk_body(''.join(info['chunks'])) + headers = info['headers'] + boundary = headers['X-Backend-Obj-Multipart-Mime-Boundary'] + self.assertTrue(boundary is not None, + "didn't get boundary for conn %r" % ( + connection_id,)) + self.assertEqual('chunked', headers['Transfer-Encoding']) + self.assertEqual('100-continue', headers['Expect']) + self.assertEqual('yes', headers['X-Backend-Obj-Metadata-Footer']) + self.assertNotIn('X-Backend-Obj-Multiphase-Commit', headers) + self.assertEqual('header_etag', headers['Etag']) + + # email.parser.FeedParser doesn't know how to take a multipart + # message and boundary together and parse it; it only knows how + # to take a string, parse the headers, and figure out the + # boundary on its own. + parser = email.parser.FeedParser() + parser.feed( + "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" % + boundary) + parser.feed(body) + message = parser.close() + + self.assertTrue(message.is_multipart()) # sanity check + mime_parts = message.get_payload() + # notice, no commit confirmation + self.assertEqual(len(mime_parts), 2) + obj_part, footer_part = mime_parts + + self.assertEqual(obj_part['X-Document'], 'object body') + self.assertEqual(test_body, obj_part.get_payload()) + + # validate footer metadata + self.assertEqual(footer_part['X-Document'], 'object metadata') + footer_metadata = json.loads(footer_part.get_payload()) + self.assertTrue(footer_metadata) + expected = {} + footers_callback(expected) + self.assertDictEqual(expected, footer_metadata) + + self.assertTrue(info['connection'].closed) + + def test_PUT_with_body_and_footers(self): + self._test_PUT_with_footers(test_body='asdf') + + def test_PUT_with_no_body_and_footers(self): + self._test_PUT_with_footers() + + @contextmanager def capture_http_requests(get_response): @@ -1885,11 +1909,21 @@ def capture_http_requests(get_response): def __init__(self, req): self.req = req self.resp = None + self.path = "/" def getresponse(self): self.resp = get_response(self.req) return self.resp + def putrequest(self, method, path, **kwargs): + pass + + def putheader(self, k, v): + pass + + def endheaders(self): + pass + class ConnectionLog(object): def __init__(self): @@ -1925,7 +1959,7 @@ def capture_http_requests(get_response): yield fake_conn -class ECObjectControllerMixin(BaseObjectControllerMixin): +class ECObjectControllerMixin(CommonObjectControllerMixin): # Add a few helper methods for EC tests. def _make_ec_archive_bodies(self, test_body, policy=None): policy = policy or self.policy @@ -2251,546 +2285,6 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(len(real_body), len(resp.body)) self.assertEqual(real_body, resp.body) - def test_PUT_simple(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - - def test_PUT_with_body_and_bad_etag(self): - segment_size = self.policy.ec_segment_size - test_body = ('asdf' * segment_size)[:-10] - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - conns = [] - - def capture_expect(conn): - # stash the backend connection so we can verify that it is closed - # (no data will be sent) - conns.append(conn) - - # send a bad etag in the request headers - headers = {'Etag': 'bad etag'} - req = swift.common.swob.Request.blank( - '/v1/a/c/o', method='PUT', headers=headers, body=test_body) - with set_http_connect(*codes, expect_headers=expect_headers, - give_expect=capture_expect): - resp = req.get_response(self.app) - self.assertEqual(422, resp.status_int) - self.assertEqual(self.replicas(), len(conns)) - for conn in conns: - self.assertTrue(conn.closed) - - # make the footers callback send a bad Etag footer - footers_callback = make_footers_callback('not the test body') - env = {'swift.callback.update_footers': footers_callback} - req = swift.common.swob.Request.blank( - '/v1/a/c/o', method='PUT', environ=env, body=test_body) - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(422, resp.status_int) - - def test_txn_id_logging_ECPUT(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - self.app.logger.txn_id = req.environ['swift.trans_id'] = 'test-txn-id' - codes = [(100, Timeout(), 503, 503)] * self.replicas() - stdout = BytesIO() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers), \ - mock.patch('sys.stdout', stdout): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 503) - for line in stdout.getvalue().splitlines(): - self.assertIn('test-txn-id', line) - self.assertIn('Trying to get ', - stdout.getvalue()) - - def test_PUT_with_explicit_commit_status(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [(100, 100, 201)] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - - def test_PUT_error(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [503] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 503) - - def test_PUT_mostly_success(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [201] * self.quorum() - codes += [503] * (self.replicas() - len(codes)) - random.shuffle(codes) - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - - def test_PUT_error_commit(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [(100, 503, Exception('not used'))] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 503) - - def test_PUT_mostly_success_commit(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [201] * self.quorum() - codes += [(100, 503, Exception('not used'))] * ( - self.replicas() - len(codes)) - random.shuffle(codes) - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - - def test_PUT_mostly_error_commit(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [(100, 503, Exception('not used'))] * self.quorum() - if isinstance(self.policy, ECStoragePolicy): - codes *= self.policy.ec_duplication_factor - codes += [201] * (self.replicas() - len(codes)) - random.shuffle(codes) - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 503) - - def test_PUT_commit_timeout(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [201] * (self.replicas() - 1) - codes.append((100, Timeout(), Exception('not used'))) - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - - def test_PUT_commit_exception(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - codes = [201] * (self.replicas() - 1) - codes.append((100, Exception('kaboom!'), Exception('not used'))) - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - - def test_PUT_ec_error_during_transfer_data(self): - class FakeReader(object): - def read(self, size): - raise IOError('error message') - - req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', - body='test body') - - req.environ['wsgi.input'] = FakeReader() - req.headers['content-length'] = '6' - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - - self.assertEqual(resp.status_int, 499) - - def test_PUT_ec_chunkreadtimeout_during_transfer_data(self): - class FakeReader(object): - def read(self, size): - raise exceptions.ChunkReadTimeout() - - req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', - body='test body') - - req.environ['wsgi.input'] = FakeReader() - req.headers['content-length'] = '6' - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - - self.assertEqual(resp.status_int, 408) - - def test_PUT_ec_timeout_during_transfer_data(self): - class FakeReader(object): - def read(self, size): - raise exceptions.Timeout() - - req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', - body='test body') - - req.environ['wsgi.input'] = FakeReader() - req.headers['content-length'] = '6' - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - - self.assertEqual(resp.status_int, 499) - - def test_PUT_ec_exception_during_transfer_data(self): - class FakeReader(object): - def read(self, size): - raise Exception('exception message') - - req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', - body='test body') - - req.environ['wsgi.input'] = FakeReader() - req.headers['content-length'] = '6' - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - - self.assertEqual(resp.status_int, 500) - - def test_PUT_with_body(self): - segment_size = self.policy.ec_segment_size - test_body = ('asdf' * segment_size)[:-10] - # make the footers callback not include Etag footer so that we can - # verify that the correct EC-calculated Etag is included in footers - # sent to backend - footers_callback = make_footers_callback() - env = {'swift.callback.update_footers': footers_callback} - req = swift.common.swob.Request.blank( - '/v1/a/c/o', method='PUT', environ=env) - etag = md5(test_body).hexdigest() - size = len(test_body) - req.body = test_body - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - resp_headers = { - 'Some-Other-Header': 'Four', - 'Etag': 'ignored', - } - - put_requests = defaultdict(lambda: {'boundary': None, 'chunks': []}) - - def capture_body(conn, chunk): - put_requests[conn.connection_id]['chunks'].append(chunk) - - def capture_headers(ip, port, device, part, method, path, headers, - **kwargs): - conn_id = kwargs['connection_id'] - put_requests[conn_id]['boundary'] = headers[ - 'X-Backend-Obj-Multipart-Mime-Boundary'] - put_requests[conn_id]['backend-content-length'] = headers[ - 'X-Backend-Obj-Content-Length'] - put_requests[conn_id]['x-timestamp'] = headers[ - 'X-Timestamp'] - - with set_http_connect(*codes, expect_headers=expect_headers, - give_send=capture_body, - give_connect=capture_headers, - headers=resp_headers): - resp = req.get_response(self.app) - - self.assertEqual(resp.status_int, 201) - timestamps = {captured_req['x-timestamp'] - for captured_req in put_requests.values()} - self.assertEqual(1, len(timestamps), timestamps) - self.assertEqual(dict(resp.headers), { - 'Content-Type': 'text/html; charset=UTF-8', - 'Content-Length': '0', - 'Last-Modified': time.strftime( - "%a, %d %b %Y %H:%M:%S GMT", - time.gmtime(math.ceil(float(timestamps.pop())))), - 'Etag': etag, - }) - frag_archives = [] - for connection_id, info in put_requests.items(): - body = unchunk_body(''.join(info['chunks'])) - self.assertIsNotNone(info['boundary'], - "didn't get boundary for conn %r" % ( - connection_id,)) - self.assertTrue(size > int(info['backend-content-length']) > 0, - "invalid backend-content-length for conn %r" % ( - connection_id,)) - - # email.parser.FeedParser doesn't know how to take a multipart - # message and boundary together and parse it; it only knows how - # to take a string, parse the headers, and figure out the - # boundary on its own. - parser = email.parser.FeedParser() - parser.feed( - "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" % - info['boundary']) - parser.feed(body) - message = parser.close() - - self.assertTrue(message.is_multipart()) # sanity check - mime_parts = message.get_payload() - self.assertEqual(len(mime_parts), 3) - obj_part, footer_part, commit_part = mime_parts - - # attach the body to frag_archives list - self.assertEqual(obj_part['X-Document'], 'object body') - frag_archives.append(obj_part.get_payload()) - - # assert length was correct for this connection - self.assertEqual(int(info['backend-content-length']), - len(frag_archives[-1])) - # assert length was the same for all connections - self.assertEqual(int(info['backend-content-length']), - len(frag_archives[0])) - - # validate some footer metadata - self.assertEqual(footer_part['X-Document'], 'object metadata') - footer_metadata = json.loads(footer_part.get_payload()) - self.assertTrue(footer_metadata) - expected = {} - # update expected with footers from the callback... - footers_callback(expected) - expected.update({ - 'X-Object-Sysmeta-Ec-Content-Length': str(size), - 'X-Backend-Container-Update-Override-Size': str(size), - 'X-Object-Sysmeta-Ec-Etag': etag, - 'X-Backend-Container-Update-Override-Etag': etag, - 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size), - 'Etag': md5(obj_part.get_payload()).hexdigest()}) - for header, value in expected.items(): - self.assertEqual(footer_metadata[header], value) - - # sanity on commit message - self.assertEqual(commit_part['X-Document'], 'put commit') - - self.assertEqual(len(frag_archives), self.replicas()) - fragment_size = self.policy.fragment_size - node_payloads = [] - for fa in frag_archives: - payload = [fa[x:x + fragment_size] - for x in range(0, len(fa), fragment_size)] - node_payloads.append(payload) - fragment_payloads = zip(*node_payloads) - - expected_body = '' - for fragment_payload in fragment_payloads: - self.assertEqual(len(fragment_payload), self.replicas()) - if True: - fragment_payload = list(fragment_payload) - expected_body += self.policy.pyeclib_driver.decode( - fragment_payload) - - self.assertEqual(len(test_body), len(expected_body)) - self.assertEqual(test_body, expected_body) - - def test_PUT_with_footers(self): - # verify footers supplied by a footers callback being added to - # trailing metadata - segment_size = self.policy.ec_segment_size - test_body = ('asdf' * segment_size)[:-10] - etag = md5(test_body).hexdigest() - size = len(test_body) - codes = [201] * self.replicas() - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - resp_headers = { - 'Some-Other-Header': 'Four', - 'Etag': 'ignored', - } - - def do_test(footers_to_add, expect_added): - put_requests = defaultdict( - lambda: {'boundary': None, 'chunks': []}) - - def capture_body(conn, chunk): - put_requests[conn.connection_id]['chunks'].append(chunk) - - def capture_headers(ip, port, device, part, method, path, headers, - **kwargs): - conn_id = kwargs['connection_id'] - put_requests[conn_id]['boundary'] = headers[ - 'X-Backend-Obj-Multipart-Mime-Boundary'] - put_requests[conn_id]['x-timestamp'] = headers[ - 'X-Timestamp'] - - def footers_callback(footers): - footers.update(footers_to_add) - env = {'swift.callback.update_footers': footers_callback} - req = swift.common.swob.Request.blank( - '/v1/a/c/o', method='PUT', environ=env, body=test_body) - - with set_http_connect(*codes, expect_headers=expect_headers, - give_send=capture_body, - give_connect=capture_headers, - headers=resp_headers): - resp = req.get_response(self.app) - - self.assertEqual(resp.status_int, 201) - timestamps = {captured_req['x-timestamp'] - for captured_req in put_requests.values()} - self.assertEqual(1, len(timestamps), timestamps) - self.assertEqual(dict(resp.headers), { - 'Content-Type': 'text/html; charset=UTF-8', - 'Content-Length': '0', - 'Last-Modified': time.strftime( - "%a, %d %b %Y %H:%M:%S GMT", - time.gmtime(math.ceil(float(timestamps.pop())))), - 'Etag': etag, - }) - for connection_id, info in put_requests.items(): - body = unchunk_body(''.join(info['chunks'])) - # email.parser.FeedParser doesn't know how to take a multipart - # message and boundary together and parse it; it only knows how - # to take a string, parse the headers, and figure out the - # boundary on its own. - parser = email.parser.FeedParser() - parser.feed( - "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" - % info['boundary']) - parser.feed(body) - message = parser.close() - - self.assertTrue(message.is_multipart()) # sanity check - mime_parts = message.get_payload() - self.assertEqual(len(mime_parts), 3) - obj_part, footer_part, commit_part = mime_parts - - # validate EC footer metadata - should always be present - self.assertEqual(footer_part['X-Document'], 'object metadata') - footer_metadata = json.loads(footer_part.get_payload()) - self.assertIsNotNone( - footer_metadata.pop('X-Object-Sysmeta-Ec-Frag-Index')) - expected = { - 'X-Object-Sysmeta-Ec-Scheme': - self.policy.ec_scheme_description, - 'X-Object-Sysmeta-Ec-Content-Length': str(size), - 'X-Object-Sysmeta-Ec-Etag': etag, - 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size), - 'Etag': md5(obj_part.get_payload()).hexdigest()} - expected.update(expect_added) - for header, value in expected.items(): - self.assertIn(header, footer_metadata) - self.assertEqual(value, footer_metadata[header]) - footer_metadata.pop(header) - self.assertFalse(footer_metadata) - - # sanity check - middleware sets no footer, expect EC overrides - footers_to_add = {} - expect_added = { - 'X-Backend-Container-Update-Override-Size': str(size), - 'X-Backend-Container-Update-Override-Etag': etag} - do_test(footers_to_add, expect_added) - - # middleware cannot overwrite any EC sysmeta - footers_to_add = { - 'X-Object-Sysmeta-Ec-Content-Length': str(size + 1), - 'X-Object-Sysmeta-Ec-Etag': 'other etag', - 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size + 1), - 'X-Object-Sysmeta-Ec-Unused-But-Reserved': 'ignored'} - do_test(footers_to_add, expect_added) - - # middleware can add x-object-sysmeta- headers including - # x-object-sysmeta-container-update-override headers - footers_to_add = { - 'X-Object-Sysmeta-Foo': 'bar', - 'X-Object-Sysmeta-Container-Update-Override-Size': - str(size + 1), - 'X-Object-Sysmeta-Container-Update-Override-Etag': 'other etag', - 'X-Object-Sysmeta-Container-Update-Override-Ping': 'pong' - } - expect_added.update(footers_to_add) - do_test(footers_to_add, expect_added) - - # middleware can also overwrite x-backend-container-update-override - # headers - override_footers = { - 'X-Backend-Container-Update-Override-Wham': 'bam', - 'X-Backend-Container-Update-Override-Size': str(size + 2), - 'X-Backend-Container-Update-Override-Etag': 'another etag'} - footers_to_add.update(override_footers) - expect_added.update(override_footers) - do_test(footers_to_add, expect_added) - - def test_PUT_old_obj_server(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - responses = [ - # one server will response 100-continue but not include the - # needful expect headers and the connection will be dropped - ((100, Exception('not used')), {}), - ] + [ - # and pleanty of successful responses too - (201, { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes', - }), - ] * self.replicas() - random.shuffle(responses) - if responses[-1][0] != 201: - # whoops, stupid random - responses = responses[1:] + [responses[0]] - codes, expect_headers = zip(*responses) - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - def test_GET_with_frags_swapped_around(self): segment_size = self.policy.ec_segment_size test_data = ('test' * segment_size)[:-657] @@ -4055,68 +3549,6 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): # 404 shows actual response body size (i.e. 0 for HEAD) self.assertEqual(resp.headers['Content-Length'], '0') - def test_PUT_with_slow_commits(self): - # It's important that this timeout be much less than the delay in - # the slow commit responses so that the slow commits are not waited - # for. - self.app.post_quorum_timeout = 0.01 - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - # plenty of slow commits - response_sleep = 5.0 - codes = [FakeStatus(201, response_sleep=response_sleep) - for i in range(self.replicas())] - # swap out some with regular fast responses - number_of_fast_responses_needed_to_be_quick_enough = \ - self.policy.quorum - fast_indexes = random.sample( - range(self.replicas()), - number_of_fast_responses_needed_to_be_quick_enough) - for i in fast_indexes: - codes[i] = 201 - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - start = time.time() - resp = req.get_response(self.app) - response_time = time.time() - start - self.assertEqual(resp.status_int, 201) - self.assertLess(response_time, response_sleep) - - def test_PUT_with_just_enough_durable_responses(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - - codes = [201] * (self.policy.ec_ndata + 1) - codes += [503] * (self.policy.ec_nparity - 1) - self.assertEqual(len(codes), self.policy.ec_n_unique_fragments) - random.shuffle(codes) - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 201) - - def test_PUT_with_less_durable_responses(self): - req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', - body='') - - codes = [201] * (self.policy.ec_ndata) - codes += [503] * (self.policy.ec_nparity) - self.assertEqual(len(codes), self.policy.ec_n_unique_fragments) - random.shuffle(codes) - expect_headers = { - 'X-Obj-Metadata-Footer': 'yes', - 'X-Obj-Multiphase-Commit': 'yes' - } - with set_http_connect(*codes, expect_headers=expect_headers): - resp = req.get_response(self.app) - self.assertEqual(resp.status_int, 503) - def test_GET_with_invalid_ranges(self): # real body size is segment_size - 10 (just 1 segment) segment_size = self.policy.ec_segment_size @@ -4789,6 +4221,554 @@ class TestECDuplicationObjController( self._test_determine_chunk_destinations_prioritize(1, 0) +class ECCommonPutterMixin(object): + # EC PUT tests common to both Mime and PUT+POST protocols + expect_headers = {} + + def test_PUT_ec_error_during_transfer_data(self): + class FakeReader(object): + def read(self, size): + raise IOError('error message') + + req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', + body='test body') + + req.environ['wsgi.input'] = FakeReader() + req.headers['content-length'] = '6' + codes = [201] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 499) + + def test_PUT_ec_chunkreadtimeout_during_transfer_data(self): + class FakeReader(object): + def read(self, size): + raise exceptions.ChunkReadTimeout() + + req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', + body='test body') + + req.environ['wsgi.input'] = FakeReader() + req.headers['content-length'] = '6' + codes = [201] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 408) + + def test_PUT_ec_timeout_during_transfer_data(self): + class FakeReader(object): + def read(self, size): + raise exceptions.Timeout() + + req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', + body='test body') + + req.environ['wsgi.input'] = FakeReader() + req.headers['content-length'] = '6' + codes = [201] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 499) + + def test_PUT_ec_exception_during_transfer_data(self): + class FakeReader(object): + def read(self, size): + raise Exception('exception message') + + req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', + body='test body') + + req.environ['wsgi.input'] = FakeReader() + req.headers['content-length'] = '6' + codes = [201] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 500) + + +# This is how CommonObjectControllerMixin is supposed to be used: +# @patch_policies(with_ec_default=True) +# class TestECObjControllerDoublePutter(BaseObjectControllerMixin, +# ECCommonPutterMixin, +# unittest.TestCase): +# # tests specific to the PUT+POST protocol +# +# def setUp(self): +# super(TestECObjControllerDoublePutter, self).setUp() +# # force use of the DoublePutter class +# self.app.use_put_v1 = True + + +@patch_policies(with_ec_default=True) +class TestECObjControllerMimePutter(BaseObjectControllerMixin, + ECCommonPutterMixin, + unittest.TestCase): + # tests specific to the older PUT protocol using a MimePutter + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + + def setUp(self): + super(TestECObjControllerMimePutter, self).setUp() + # force use of the MimePutter class + self.app.use_put_v1 = False + + def test_PUT_simple(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_with_body_and_bad_etag(self): + segment_size = self.policy.ec_segment_size + test_body = ('asdf' * segment_size)[:-10] + codes = [201] * self.replicas() + conns = [] + + def capture_expect(conn): + # stash the backend connection so we can verify that it is closed + # (no data will be sent) + conns.append(conn) + + # send a bad etag in the request headers + headers = {'Etag': 'bad etag'} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', headers=headers, body=test_body) + with set_http_connect(*codes, expect_headers=self.expect_headers, + give_expect=capture_expect): + resp = req.get_response(self.app) + self.assertEqual(422, resp.status_int) + self.assertEqual(self.replicas(), len(conns)) + for conn in conns: + self.assertTrue(conn.closed) + + # make the footers callback send a bad Etag footer + footers_callback = make_footers_callback('not the test body') + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', environ=env, body=test_body) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(422, resp.status_int) + + def test_txn_id_logging_ECPUT(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + self.app.logger.txn_id = req.environ['swift.trans_id'] = 'test-txn-id' + codes = [(100, Timeout(), 503, 503)] * self.replicas() + stdout = BytesIO() + with set_http_connect(*codes, expect_headers=self.expect_headers), \ + mock.patch('sys.stdout', stdout): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + for line in stdout.getvalue().splitlines(): + self.assertIn('test-txn-id', line) + self.assertIn('Trying to get ', + stdout.getvalue()) + + def test_PUT_with_explicit_commit_status(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [(100, 100, 201)] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_mostly_success(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * self.quorum() + codes += [503] * (self.replicas() - len(codes)) + random.shuffle(codes) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_error_commit(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [(100, 503, Exception('not used'))] * self.replicas() + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def test_PUT_mostly_success_commit(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * self.quorum() + codes += [(100, 503, Exception('not used'))] * ( + self.replicas() - len(codes)) + random.shuffle(codes) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_mostly_error_commit(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [(100, 503, Exception('not used'))] * self.quorum() + if isinstance(self.policy, ECStoragePolicy): + codes *= self.policy.ec_duplication_factor + codes += [201] * (self.replicas() - len(codes)) + random.shuffle(codes) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def test_PUT_commit_timeout(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * (self.replicas() - 1) + codes.append((100, Timeout(), Exception('not used'))) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_commit_exception(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * (self.replicas() - 1) + codes.append((100, Exception('kaboom!'), Exception('not used'))) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_with_body(self): + segment_size = self.policy.ec_segment_size + test_body = (b'asdf' * segment_size)[:-10] + # make the footers callback not include Etag footer so that we can + # verify that the correct EC-calculated Etag is included in footers + # sent to backend + footers_callback = make_footers_callback() + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', environ=env) + etag = md5(test_body).hexdigest() + size = len(test_body) + req.body = test_body + codes = [201] * self.replicas() + resp_headers = { + 'Some-Other-Header': 'Four', + 'Etag': 'ignored', + } + + put_requests = defaultdict(lambda: {'boundary': None, 'chunks': []}) + + def capture_body(conn, chunk): + put_requests[conn.connection_id]['chunks'].append(chunk) + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + conn_id = kwargs['connection_id'] + put_requests[conn_id]['boundary'] = headers[ + 'X-Backend-Obj-Multipart-Mime-Boundary'] + put_requests[conn_id]['backend-content-length'] = headers[ + 'X-Backend-Obj-Content-Length'] + put_requests[conn_id]['x-timestamp'] = headers[ + 'X-Timestamp'] + + with set_http_connect(*codes, expect_headers=self.expect_headers, + give_send=capture_body, + give_connect=capture_headers, + headers=resp_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 201) + timestamps = {captured_req['x-timestamp'] + for captured_req in put_requests.values()} + self.assertEqual(1, len(timestamps), timestamps) + self.assertEqual(dict(resp.headers), { + 'Content-Type': 'text/html; charset=UTF-8', + 'Content-Length': '0', + 'Last-Modified': time.strftime( + "%a, %d %b %Y %H:%M:%S GMT", + time.gmtime(math.ceil(float(timestamps.pop())))), + 'Etag': etag, + }) + frag_archives = [] + for connection_id, info in put_requests.items(): + body = unchunk_body(''.join(info['chunks'])) + self.assertIsNotNone(info['boundary'], + "didn't get boundary for conn %r" % ( + connection_id,)) + self.assertTrue(size > int(info['backend-content-length']) > 0, + "invalid backend-content-length for conn %r" % ( + connection_id,)) + + # email.parser.FeedParser doesn't know how to take a multipart + # message and boundary together and parse it; it only knows how + # to take a string, parse the headers, and figure out the + # boundary on its own. + parser = email.parser.FeedParser() + parser.feed( + "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" % + info['boundary']) + parser.feed(body) + message = parser.close() + + self.assertTrue(message.is_multipart()) # sanity check + mime_parts = message.get_payload() + self.assertEqual(len(mime_parts), 3) + obj_part, footer_part, commit_part = mime_parts + + # attach the body to frag_archives list + self.assertEqual(obj_part['X-Document'], 'object body') + frag_archives.append(obj_part.get_payload()) + + # assert length was correct for this connection + self.assertEqual(int(info['backend-content-length']), + len(frag_archives[-1])) + # assert length was the same for all connections + self.assertEqual(int(info['backend-content-length']), + len(frag_archives[0])) + + # validate some footer metadata + self.assertEqual(footer_part['X-Document'], 'object metadata') + footer_metadata = json.loads(footer_part.get_payload()) + self.assertTrue(footer_metadata) + expected = {} + # update expected with footers from the callback... + footers_callback(expected) + expected.update({ + 'X-Object-Sysmeta-Ec-Content-Length': str(size), + 'X-Backend-Container-Update-Override-Size': str(size), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Container-Update-Override-Etag': etag, + 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size), + 'Etag': md5(obj_part.get_payload()).hexdigest()}) + for header, value in expected.items(): + self.assertEqual(footer_metadata[header], value) + + # sanity on commit message + self.assertEqual(commit_part['X-Document'], 'put commit') + + self.assertEqual(len(frag_archives), self.replicas()) + fragment_size = self.policy.fragment_size + node_payloads = [] + for fa in frag_archives: + payload = [fa[x:x + fragment_size] + for x in range(0, len(fa), fragment_size)] + node_payloads.append(payload) + fragment_payloads = zip(*node_payloads) + + expected_body = '' + for fragment_payload in fragment_payloads: + self.assertEqual(len(fragment_payload), self.replicas()) + if True: + fragment_payload = list(fragment_payload) + expected_body += self.policy.pyeclib_driver.decode( + fragment_payload) + + self.assertEqual(len(test_body), len(expected_body)) + self.assertEqual(test_body, expected_body) + + def test_PUT_with_footers(self): + # verify footers supplied by a footers callback being added to + # trailing metadata + segment_size = self.policy.ec_segment_size + test_body = ('asdf' * segment_size)[:-10] + etag = md5(test_body).hexdigest() + size = len(test_body) + codes = [201] * self.replicas() + resp_headers = { + 'Some-Other-Header': 'Four', + 'Etag': 'ignored', + } + + def do_test(footers_to_add, expect_added): + put_requests = defaultdict( + lambda: {'boundary': None, 'chunks': []}) + + def capture_body(conn, chunk): + put_requests[conn.connection_id]['chunks'].append(chunk) + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + conn_id = kwargs['connection_id'] + put_requests[conn_id]['boundary'] = headers[ + 'X-Backend-Obj-Multipart-Mime-Boundary'] + put_requests[conn_id]['x-timestamp'] = headers[ + 'X-Timestamp'] + + def footers_callback(footers): + footers.update(footers_to_add) + env = {'swift.callback.update_footers': footers_callback} + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='PUT', environ=env, body=test_body) + + with set_http_connect(*codes, expect_headers=self.expect_headers, + give_send=capture_body, + give_connect=capture_headers, + headers=resp_headers): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 201) + timestamps = {captured_req['x-timestamp'] + for captured_req in put_requests.values()} + self.assertEqual(1, len(timestamps), timestamps) + self.assertEqual(dict(resp.headers), { + 'Content-Type': 'text/html; charset=UTF-8', + 'Content-Length': '0', + 'Last-Modified': time.strftime( + "%a, %d %b %Y %H:%M:%S GMT", + time.gmtime(math.ceil(float(timestamps.pop())))), + 'Etag': etag, + }) + for connection_id, info in put_requests.items(): + body = unchunk_body(''.join(info['chunks'])) + # email.parser.FeedParser doesn't know how to take a multipart + # message and boundary together and parse it; it only knows how + # to take a string, parse the headers, and figure out the + # boundary on its own. + parser = email.parser.FeedParser() + parser.feed( + "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" + % info['boundary']) + parser.feed(body) + message = parser.close() + + self.assertTrue(message.is_multipart()) # sanity check + mime_parts = message.get_payload() + self.assertEqual(len(mime_parts), 3) + obj_part, footer_part, commit_part = mime_parts + + # validate EC footer metadata - should always be present + self.assertEqual(footer_part['X-Document'], 'object metadata') + footer_metadata = json.loads(footer_part.get_payload()) + self.assertIsNotNone( + footer_metadata.pop('X-Object-Sysmeta-Ec-Frag-Index')) + expected = { + 'X-Object-Sysmeta-Ec-Scheme': + self.policy.ec_scheme_description, + 'X-Object-Sysmeta-Ec-Content-Length': str(size), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size), + 'Etag': md5(obj_part.get_payload()).hexdigest()} + expected.update(expect_added) + for header, value in expected.items(): + self.assertIn(header, footer_metadata) + self.assertEqual(value, footer_metadata[header]) + footer_metadata.pop(header) + self.assertFalse(footer_metadata) + + # sanity check - middleware sets no footer, expect EC overrides + footers_to_add = {} + expect_added = { + 'X-Backend-Container-Update-Override-Size': str(size), + 'X-Backend-Container-Update-Override-Etag': etag} + do_test(footers_to_add, expect_added) + + # middleware cannot overwrite any EC sysmeta + footers_to_add = { + 'X-Object-Sysmeta-Ec-Content-Length': str(size + 1), + 'X-Object-Sysmeta-Ec-Etag': 'other etag', + 'X-Object-Sysmeta-Ec-Segment-Size': str(segment_size + 1), + 'X-Object-Sysmeta-Ec-Unused-But-Reserved': 'ignored'} + do_test(footers_to_add, expect_added) + + # middleware can add x-object-sysmeta- headers including + # x-object-sysmeta-container-update-override headers + footers_to_add = { + 'X-Object-Sysmeta-Foo': 'bar', + 'X-Object-Sysmeta-Container-Update-Override-Size': + str(size + 1), + 'X-Object-Sysmeta-Container-Update-Override-Etag': 'other etag', + 'X-Object-Sysmeta-Container-Update-Override-Ping': 'pong' + } + expect_added.update(footers_to_add) + do_test(footers_to_add, expect_added) + + # middleware can also overwrite x-backend-container-update-override + # headers + override_footers = { + 'X-Backend-Container-Update-Override-Wham': 'bam', + 'X-Backend-Container-Update-Override-Size': str(size + 2), + 'X-Backend-Container-Update-Override-Etag': 'another etag'} + footers_to_add.update(override_footers) + expect_added.update(override_footers) + do_test(footers_to_add, expect_added) + + def test_PUT_old_obj_server(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + responses = [ + # one server will response 100-continue but not include the + # needful expect headers and the connection will be dropped + ((100, Exception('not used')), {}), + ] + [ + # and pleanty of successful responses too + (201, { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes', + }), + ] * self.replicas() + random.shuffle(responses) + if responses[-1][0] != 201: + # whoops, stupid random + responses = responses[1:] + [responses[0]] + codes, expect_headers = zip(*responses) + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_with_slow_commits(self): + # It's important that this timeout be much less than the delay in + # the slow commit responses so that the slow commits are not waited + # for. + self.app.post_quorum_timeout = 0.01 + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + # plenty of slow commits + response_sleep = 5.0 + codes = [FakeStatus(201, response_sleep=response_sleep) + for i in range(self.replicas())] + # swap out some with regular fast responses + number_of_fast_responses_needed_to_be_quick_enough = \ + self.policy.quorum + fast_indexes = random.sample( + range(self.replicas()), + number_of_fast_responses_needed_to_be_quick_enough) + for i in fast_indexes: + codes[i] = 201 + with set_http_connect(*codes, expect_headers=self.expect_headers): + start = time.time() + resp = req.get_response(self.app) + response_time = time.time() - start + self.assertEqual(resp.status_int, 201) + self.assertLess(response_time, response_sleep) + + def test_PUT_with_just_enough_durable_responses(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + + codes = [201] * (self.policy.ec_ndata + 1) + codes += [503] * (self.policy.ec_nparity - 1) + self.assertEqual(len(codes), self.policy.ec_n_unique_fragments) + random.shuffle(codes) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_PUT_with_less_durable_responses(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + + codes = [201] * (self.policy.ec_ndata) + codes += [503] * (self.policy.ec_nparity) + self.assertEqual(len(codes), self.policy.ec_n_unique_fragments) + random.shuffle(codes) + with set_http_connect(*codes, expect_headers=self.expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + class TestNumContainerUpdates(unittest.TestCase): def test_it(self): test_cases = [