Merge "Set backend content length for fallocate - EC Policy"
This commit is contained in:
@@ -1772,7 +1772,7 @@ class ECPutter(object):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
|
def connect(cls, node, part, path, headers, conn_timeout, node_timeout,
|
||||||
chunked=False):
|
chunked=False, expected_frag_archive_size=None):
|
||||||
"""
|
"""
|
||||||
Connect to a backend node and send the headers.
|
Connect to a backend node and send the headers.
|
||||||
|
|
||||||
@@ -1794,9 +1794,10 @@ class ECPutter(object):
|
|||||||
# we must use chunked encoding.
|
# we must use chunked encoding.
|
||||||
headers['Transfer-Encoding'] = 'chunked'
|
headers['Transfer-Encoding'] = 'chunked'
|
||||||
headers['Expect'] = '100-continue'
|
headers['Expect'] = '100-continue'
|
||||||
if 'Content-Length' in headers:
|
|
||||||
headers['X-Backend-Obj-Content-Length'] = \
|
# make sure this isn't there
|
||||||
headers.pop('Content-Length')
|
headers.pop('Content-Length')
|
||||||
|
headers['X-Backend-Obj-Content-Length'] = expected_frag_archive_size
|
||||||
|
|
||||||
headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary
|
headers['X-Backend-Obj-Multipart-Mime-Boundary'] = mime_boundary
|
||||||
|
|
||||||
@@ -2121,16 +2122,41 @@ class ECObjectController(BaseObjectController):
|
|||||||
# the object server will get different bytes, so these
|
# the object server will get different bytes, so these
|
||||||
# values do not apply (Content-Length might, in general, but
|
# values do not apply (Content-Length might, in general, but
|
||||||
# in the specific case of replication vs. EC, it doesn't).
|
# in the specific case of replication vs. EC, it doesn't).
|
||||||
headers.pop('Content-Length', None)
|
client_cl = headers.pop('Content-Length', None)
|
||||||
headers.pop('Etag', None)
|
headers.pop('Etag', None)
|
||||||
|
|
||||||
|
expected_frag_size = None
|
||||||
|
if client_cl:
|
||||||
|
policy_index = int(headers.get('X-Backend-Storage-Policy-Index'))
|
||||||
|
policy = POLICIES.get_by_index(policy_index)
|
||||||
|
# TODO: PyECLib <= 1.2.0 looks to return the segment info
|
||||||
|
# different from the input for aligned data efficiency but
|
||||||
|
# Swift never does. So calculate the fragment length Swift
|
||||||
|
# will actually send to object sever by making two different
|
||||||
|
# get_segment_info calls (until PyECLib fixed).
|
||||||
|
# policy.fragment_size makes the call using segment size,
|
||||||
|
# and the next call is to get info for the last segment
|
||||||
|
|
||||||
|
# get number of fragments except the tail - use truncation //
|
||||||
|
num_fragments = int(client_cl) // policy.ec_segment_size
|
||||||
|
expected_frag_size = policy.fragment_size * num_fragments
|
||||||
|
|
||||||
|
# calculate the tail fragment_size by hand and add it to
|
||||||
|
# expected_frag_size
|
||||||
|
last_segment_size = int(client_cl) % policy.ec_segment_size
|
||||||
|
if last_segment_size:
|
||||||
|
last_info = policy.pyeclib_driver.get_segment_info(
|
||||||
|
last_segment_size, policy.ec_segment_size)
|
||||||
|
expected_frag_size += last_info['fragment_size']
|
||||||
|
|
||||||
self.app.logger.thread_locals = logger_thread_locals
|
self.app.logger.thread_locals = logger_thread_locals
|
||||||
for node in node_iter:
|
for node in node_iter:
|
||||||
try:
|
try:
|
||||||
putter = ECPutter.connect(
|
putter = ECPutter.connect(
|
||||||
node, part, path, headers,
|
node, part, path, headers,
|
||||||
conn_timeout=self.app.conn_timeout,
|
conn_timeout=self.app.conn_timeout,
|
||||||
node_timeout=self.app.node_timeout)
|
node_timeout=self.app.node_timeout,
|
||||||
|
expected_frag_archive_size=expected_frag_size)
|
||||||
self.app.set_node_timing(node, putter.connect_duration)
|
self.app.set_node_timing(node, putter.connect_duration)
|
||||||
return putter
|
return putter
|
||||||
except InsufficientStorage:
|
except InsufficientStorage:
|
||||||
|
@@ -1497,6 +1497,8 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
|||||||
conn_id = kwargs['connection_id']
|
conn_id = kwargs['connection_id']
|
||||||
put_requests[conn_id]['boundary'] = headers[
|
put_requests[conn_id]['boundary'] = headers[
|
||||||
'X-Backend-Obj-Multipart-Mime-Boundary']
|
'X-Backend-Obj-Multipart-Mime-Boundary']
|
||||||
|
put_requests[conn_id]['backend-content-length'] = headers[
|
||||||
|
'X-Backend-Obj-Content-Length']
|
||||||
|
|
||||||
with set_http_connect(*codes, expect_headers=expect_headers,
|
with set_http_connect(*codes, expect_headers=expect_headers,
|
||||||
give_send=capture_body,
|
give_send=capture_body,
|
||||||
@@ -1510,6 +1512,9 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
|||||||
self.assertTrue(info['boundary'] is not None,
|
self.assertTrue(info['boundary'] is not None,
|
||||||
"didn't get boundary for conn %r" % (
|
"didn't get boundary for conn %r" % (
|
||||||
connection_id,))
|
connection_id,))
|
||||||
|
self.assertTrue(size > int(info['backend-content-length']) > 0,
|
||||||
|
"invalid backend-content-length for conn %r" % (
|
||||||
|
connection_id,))
|
||||||
|
|
||||||
# email.parser.FeedParser doesn't know how to take a multipart
|
# email.parser.FeedParser doesn't know how to take a multipart
|
||||||
# message and boundary together and parse it; it only knows how
|
# message and boundary together and parse it; it only knows how
|
||||||
@@ -1531,6 +1536,13 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
|||||||
self.assertEqual(obj_part['X-Document'], 'object body')
|
self.assertEqual(obj_part['X-Document'], 'object body')
|
||||||
frag_archives.append(obj_part.get_payload())
|
frag_archives.append(obj_part.get_payload())
|
||||||
|
|
||||||
|
# assert length was correct for this connection
|
||||||
|
self.assertEqual(int(info['backend-content-length']),
|
||||||
|
len(frag_archives[-1]))
|
||||||
|
# assert length was the same for all connections
|
||||||
|
self.assertEqual(int(info['backend-content-length']),
|
||||||
|
len(frag_archives[0]))
|
||||||
|
|
||||||
# validate some footer metadata
|
# validate some footer metadata
|
||||||
self.assertEqual(footer_part['X-Document'], 'object metadata')
|
self.assertEqual(footer_part['X-Document'], 'object metadata')
|
||||||
footer_metadata = json.loads(footer_part.get_payload())
|
footer_metadata = json.loads(footer_part.get_payload())
|
||||||
|
Reference in New Issue
Block a user