Send correct size in POST async update for EC object

When a PUT request is made to an EC object the resulting container
update must include the override values for the actual object
etag and size, as opposed to the fragment etag and size. When a POST
request is made the same override values should be included in the
container update, but currently the update includes the incorrect EC
fragment size (but the correct body etag).

This is ok so long as the update for the object PUT request arrives at
the container server first (whether by direct update or replication)
because the etag and size values in an update due to an object POST
will not have a newer timestamp that the PUT and will therefore be
ignored at the container server.

However, if the update due to the object PUT request has not arrived
at the container server when the update due to the object POST
arrives, then the etag and incorrect size sent with the POST update
will be recorded in the container server. If the update due to the PUT
subsequently arrives it will not fix this error because the timestamp
of its etag and size values is not greater than that of the already
recorded values.

Fortunately the correct object body size is persisted with the object
as X-Backend-Container-Update-Override-Size sysmeta so this patch
fixes the container update due to a POST to use that value instead of
the Content-Length metadata.

Closes-Bug: #1582723
Change-Id: Ide7c9c59eb41aa09eaced2acfd0700f882c6eab1
This commit is contained in:
Alistair Coles 2016-05-17 14:22:05 +01:00
parent 9482ef89a2
commit c1b1a5a0ee
4 changed files with 176 additions and 67 deletions

@ -543,15 +543,6 @@ class ObjectController(BaseStorageServer):
except (DiskFileXattrNotSupported, DiskFileNoSpace):
return HTTPInsufficientStorage(drive=device, request=request)
update_etag = orig_metadata['ETag']
if 'X-Object-Sysmeta-Ec-Etag' in orig_metadata:
# For EC policy, send X-Object-Sysmeta-Ec-Etag which is same as the
# X-Backend-Container-Update-Override-Etag value sent with the
# original PUT. We have to send Etag (and size etc) with a POST
# container update because the original PUT container update may
# have failed or be in async_pending.
update_etag = orig_metadata['X-Object-Sysmeta-Ec-Etag']
if (content_type_headers['Content-Type-Timestamp']
!= disk_file.data_timestamp):
# Current content-type is not from the datafile, but the datafile
@ -567,17 +558,34 @@ class ObjectController(BaseStorageServer):
content_type_headers['Content-Type'] += (';swift_bytes=%s'
% swift_bytes)
update_headers = HeaderKeyDict({
'x-size': orig_metadata['Content-Length'],
'x-content-type': content_type_headers['Content-Type'],
'x-timestamp': disk_file.data_timestamp.internal,
'x-content-type-timestamp':
content_type_headers['Content-Type-Timestamp'],
'x-meta-timestamp': metadata['X-Timestamp'],
'x-etag': orig_metadata['ETag']})
# Special cases for backwards compatibility.
# For EC policy, send X-Object-Sysmeta-Ec-Etag which is same as the
# X-Backend-Container-Update-Override-Etag value sent with the original
# PUT. Similarly send X-Object-Sysmeta-Ec-Content-Length which is the
# same as the X-Backend-Container-Update-Override-Size value. We have
# to send Etag and size with a POST container update because the
# original PUT container update may have failed or be in async_pending.
if 'X-Object-Sysmeta-Ec-Etag' in orig_metadata:
update_headers['X-Etag'] = orig_metadata[
'X-Object-Sysmeta-Ec-Etag']
if 'X-Object-Sysmeta-Ec-Content-Length' in orig_metadata:
update_headers['X-Size'] = orig_metadata[
'X-Object-Sysmeta-Ec-Content-Length']
self._check_container_override(update_headers, orig_metadata)
# object POST updates are PUT to the container server
self.container_update(
'PUT', account, container, obj, request,
HeaderKeyDict({
'x-size': orig_metadata['Content-Length'],
'x-content-type': content_type_headers['Content-Type'],
'x-timestamp': disk_file.data_timestamp.internal,
'x-content-type-timestamp':
content_type_headers['Content-Type-Timestamp'],
'x-meta-timestamp': metadata['X-Timestamp'],
'x-etag': update_etag}),
'PUT', account, container, obj, request, update_headers,
device, policy)
return HTTPAccepted(request=request)

@ -443,10 +443,13 @@ class ProbeTest(unittest.TestCase):
swift_dir = /etc/swift
[pipeline:main]
pipeline = catch_errors cache proxy-server
pipeline = catch_errors cache copy proxy-server
[app:proxy-server]
use = egg:swift#proxy
[filter:copy]
use = egg:swift#copy
object_post_as_copy = %s
[filter:cache]

@ -14,21 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
from io import StringIO
from tempfile import mkdtemp
from textwrap import dedent
from unittest import main
from uuid import uuid4
from swiftclient import client
from swift.common import direct_client, internal_client
from swift.common import direct_client
from swift.common.manager import Manager
from test.probe.common import kill_nonprimary_server, \
kill_server, ReplProbeTest, start_server
kill_server, ReplProbeTest, start_server, ECProbeTest
class TestObjectAsyncUpdate(ReplProbeTest):
@ -73,67 +68,165 @@ class TestUpdateOverrides(ReplProbeTest):
Verify that the update override headers take effect and override
values propagate to the container server.
"""
def setUp(self):
"""
Reset all environment and start all servers.
"""
super(TestUpdateOverrides, self).setUp()
self.tempdir = mkdtemp()
conf_path = os.path.join(self.tempdir, 'internal_client.conf')
conf_body = """
[DEFAULT]
swift_dir = /etc/swift
[pipeline:main]
pipeline = catch_errors cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
[filter:cache]
use = egg:swift#memcache
[filter:catch_errors]
use = egg:swift#catch_errors
"""
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
self.int_client = internal_client.InternalClient(conf_path, 'test', 1)
def tearDown(self):
super(TestUpdateOverrides, self).tearDown()
shutil.rmtree(self.tempdir)
def test(self):
def test_update_during_PUT(self):
# verify that update sent during a PUT has override values
int_client = self.make_internal_client()
headers = {
'Content-Type': 'text/plain',
'X-Backend-Container-Update-Override-Etag': 'override-etag',
'X-Backend-Container-Update-Override-Content-Type': 'override-type'
'X-Backend-Container-Update-Override-Content-Type':
'override-type',
'X-Backend-Container-Update-Override-Size': '1999'
}
client.put_container(self.url, self.token, 'c1',
headers={'X-Storage-Policy':
self.policy.name})
self.int_client.upload_object(StringIO(u'stuff'), self.account,
'c1', 'o1', headers)
int_client.upload_object(
StringIO(u'stuff'), self.account, 'c1', 'o1', headers)
# Run the object-updaters to be sure updates are done
Manager(['object-updater']).once()
meta = self.int_client.get_object_metadata(self.account, 'c1', 'o1')
meta = int_client.get_object_metadata(self.account, 'c1', 'o1')
self.assertEqual('text/plain', meta['content-type'])
self.assertEqual('c13d88cb4cb02003daedb8a84e5d272a', meta['etag'])
self.assertEqual('5', meta['content-length'])
obj_iter = self.int_client.iter_objects(self.account, 'c1')
obj_iter = int_client.iter_objects(self.account, 'c1')
for obj in obj_iter:
if obj['name'] == 'o1':
self.assertEqual('override-etag', obj['hash'])
self.assertEqual('override-type', obj['content_type'])
self.assertEqual(1999, obj['bytes'])
break
else:
self.fail('Failed to find object o1 in listing')
class TestUpdateOverridesEC(ECProbeTest):
# verify that the container update overrides used with EC policies make
# it to the container servers when container updates are sync or async
# and possibly re-ordered with respect to object PUT and POST requests.
def test_async_update_after_PUT(self):
cpart, cnodes = self.container_ring.get_nodes(self.account, 'c1')
client.put_container(self.url, self.token, 'c1',
headers={'X-Storage-Policy':
self.policy.name})
# put an object while one container server is stopped so that we force
# an async update to it
kill_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
content = u'stuff'
client.put_object(self.url, self.token, 'c1', 'o1', contents=content)
meta = client.head_object(self.url, self.token, 'c1', 'o1')
# re-start the container server and assert that it does not yet know
# about the object
start_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
self.assertFalse(direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1])
# Run the object-updaters to be sure updates are done
Manager(['object-updater']).once()
# check the re-started container server has update with override values
obj = direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1][0]
self.assertEqual(meta['etag'], obj['hash'])
self.assertEqual(len(content), obj['bytes'])
def test_update_during_POST_only(self):
# verify correct update values when PUT update is missed but then a
# POST update succeeds *before* the PUT async pending update is sent
cpart, cnodes = self.container_ring.get_nodes(self.account, 'c1')
client.put_container(self.url, self.token, 'c1',
headers={'X-Storage-Policy':
self.policy.name})
# put an object while one container server is stopped so that we force
# an async update to it
kill_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
content = u'stuff'
client.put_object(self.url, self.token, 'c1', 'o1', contents=content)
meta = client.head_object(self.url, self.token, 'c1', 'o1')
# re-start the container server and assert that it does not yet know
# about the object
start_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
self.assertFalse(direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1])
# use internal client for POST so we can force fast-post mode
int_client = self.make_internal_client(object_post_as_copy=False)
int_client.set_object_metadata(
self.account, 'c1', 'o1', {'X-Object-Meta-Fruit': 'Tomato'})
self.assertEqual(
'Tomato',
int_client.get_object_metadata(self.account, 'c1', 'o1')
['x-object-meta-fruit']) # sanity
# check the re-started container server has update with override values
obj = direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1][0]
self.assertEqual(meta['etag'], obj['hash'])
self.assertEqual(len(content), obj['bytes'])
# Run the object-updaters to send the async pending from the PUT
Manager(['object-updater']).once()
# check container listing metadata is still correct
obj = direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1][0]
self.assertEqual(meta['etag'], obj['hash'])
self.assertEqual(len(content), obj['bytes'])
def test_async_updates_after_PUT_and_POST(self):
# verify correct update values when PUT update and POST updates are
# missed but then async updates are sent
cpart, cnodes = self.container_ring.get_nodes(self.account, 'c1')
client.put_container(self.url, self.token, 'c1',
headers={'X-Storage-Policy':
self.policy.name})
# PUT and POST to object while one container server is stopped so that
# we force async updates to it
kill_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
content = u'stuff'
client.put_object(self.url, self.token, 'c1', 'o1', contents=content)
meta = client.head_object(self.url, self.token, 'c1', 'o1')
# use internal client for POST so we can force fast-post mode
int_client = self.make_internal_client(object_post_as_copy=False)
int_client.set_object_metadata(
self.account, 'c1', 'o1', {'X-Object-Meta-Fruit': 'Tomato'})
self.assertEqual(
'Tomato',
int_client.get_object_metadata(self.account, 'c1', 'o1')
['x-object-meta-fruit']) # sanity
# re-start the container server and assert that it does not yet know
# about the object
start_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
self.assertFalse(direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1])
# Run the object-updaters to send the async pendings
Manager(['object-updater']).once()
# check container listing metadata is still correct
obj = direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1][0]
self.assertEqual(meta['etag'], obj['hash'])
self.assertEqual(len(content), obj['bytes'])
if __name__ == '__main__':
main()

@ -507,21 +507,26 @@ class TestObjectController(unittest.TestCase):
headers_out, objdevice, policy):
calls_made.append((headers_out, policy))
body = 'test'
headers = {
'X-Timestamp': t[1].internal,
'Content-Type': 'application/octet-stream;swift_bytes=123456789',
'Content-Length': '4',
'X-Backend-Storage-Policy-Index': int(policy)}
if policy.policy_type == EC_POLICY:
# EC fragments will typically have a different size to the body and
# for small bodies the fragments may be longer. For this test all
# that matters is that the fragment and body lengths differ.
body = body + 'ec_overhead'
headers['X-Backend-Container-Update-Override-Etag'] = update_etag
headers['X-Backend-Container-Update-Override-Size'] = '4'
headers['X-Object-Sysmeta-Ec-Etag'] = update_etag
headers['X-Object-Sysmeta-Ec-Content-Length'] = '4'
headers['X-Object-Sysmeta-Ec-Frag-Index'] = 2
headers['Content-Length'] = str(len(body))
req = Request.blank('/sda1/p/a/c/o',
req = Request.blank('/sda1/p/a/c/o', body=body,
environ={'REQUEST_METHOD': 'PUT'},
headers=headers)
req.body = 'test'
with mock.patch('swift.obj.server.ObjectController.container_update',
mock_container_update):
resp = req.get_response(self.object_controller)