Merge "Reduce backend requests for SLO If-Match / HEAD requests"

This commit is contained in:
Jenkins 2016-11-22 18:01:06 +00:00 committed by Gerrit Code Review
commit d299a0a309
3 changed files with 208 additions and 85 deletions

View File

@ -200,15 +200,12 @@ the manifest and the segments it's referring to) in the container and account
metadata which can be used for stats purposes.
"""
from six.moves import range
from collections import defaultdict
from datetime import datetime
import json
import mimetypes
import re
import six
from six import BytesIO
from hashlib import md5
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
@ -219,7 +216,8 @@ from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
closing_if_possible, LRUCache, StreamingPile
from swift.common.request_helpers import SegmentedIterable
from swift.common.request_helpers import SegmentedIterable, \
get_sys_meta_prefix, update_etag_is_at_header
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
from swift.common.wsgi import WSGIContext, make_subrequest
@ -236,6 +234,9 @@ REQUIRED_SLO_KEYS = set(['path', 'etag', 'size_bytes'])
OPTIONAL_SLO_KEYS = set(['range'])
ALLOWED_SLO_KEYS = REQUIRED_SLO_KEYS | OPTIONAL_SLO_KEYS
SYSMETA_SLO_ETAG = get_sys_meta_prefix('object') + 'slo-etag'
SYSMETA_SLO_SIZE = get_sys_meta_prefix('object') + 'slo-size'
def parse_and_validate_input(req_body, req_path):
"""
@ -361,25 +362,6 @@ def parse_and_validate_input(req_body, req_path):
return parsed_data
class SloPutContext(WSGIContext):
def __init__(self, slo, slo_etag):
super(SloPutContext, self).__init__(slo.app)
self.slo_etag = '"' + slo_etag.hexdigest() + '"'
def handle_slo_put(self, req, start_response):
app_resp = self._app_call(req.environ)
for i in range(len(self._response_headers)):
if self._response_headers[i][0].lower() == 'etag':
self._response_headers[i] = ('Etag', self.slo_etag)
break
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return app_resp
class SloGetContext(WSGIContext):
max_slo_recursion_depth = 10
@ -539,6 +521,9 @@ class SloGetContext(WSGIContext):
Note: this assumes that X-Static-Large-Object has already been found.
"""
if req.method == 'HEAD':
# We've already looked for SYSMETA_SLO_ETAG/SIZE in the response
# and didn't find them. We have to fetch the whole manifest and
# recompute.
return True
response_status = int(self._response_status[:3])
@ -581,14 +566,31 @@ class SloGetContext(WSGIContext):
what may be a static large object manifest (or may not).
:param start_response: WSGI start_response callable
"""
if req.params.get('multipart-manifest') != 'get':
# If this object is an SLO manifest, we may have saved off the
# large object etag during the original PUT. Send an
# X-Backend-Etag-Is-At header so that, if the SLO etag *was*
# saved, we can trust the object-server to respond appropriately
# to If-Match/If-None-Match requests.
update_etag_is_at_header(req, SYSMETA_SLO_ETAG)
resp_iter = self._app_call(req.environ)
# make sure this response is for a static large object manifest
slo_marker = slo_etag = slo_size = None
for header, value in self._response_headers:
if (header.lower() == 'x-static-large-object' and
config_true_value(value)):
header = header.lower()
if header == SYSMETA_SLO_ETAG:
slo_etag = value
elif header == SYSMETA_SLO_SIZE:
slo_size = value
elif (header == 'x-static-large-object' and
config_true_value(value)):
slo_marker = value
if slo_marker and slo_etag and slo_size:
break
else:
if not slo_marker:
# Not a static large object manifest. Just pass it through.
start_response(self._response_status,
self._response_headers,
@ -614,6 +616,22 @@ class SloGetContext(WSGIContext):
self._response_exc_info)
return resp_iter
is_conditional = self._response_status.startswith(('304', '412')) and (
req.if_match or req.if_none_match)
if slo_etag and slo_size and (
req.method == 'HEAD' or is_conditional):
# Since we have length and etag, we can respond immediately
for i, (header, _value) in enumerate(self._response_headers):
lheader = header.lower()
if lheader == 'etag':
self._response_headers[i] = (header, '"%s"' % slo_etag)
elif lheader == 'content-length' and not is_conditional:
self._response_headers[i] = (header, slo_size)
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
if self._need_to_refetch_manifest(req):
req.environ['swift.non_client_disconnect'] = True
close_if_possible(resp_iter)
@ -659,8 +677,7 @@ class SloGetContext(WSGIContext):
new_headers = []
for header, value in resp_headers:
if header.lower() == 'content-length':
new_headers.append(('Content-Length',
len(json_data)))
new_headers.append(('Content-Length', len(json_data)))
else:
new_headers.append((header, value))
self._response_headers = new_headers
@ -680,23 +697,36 @@ class SloGetContext(WSGIContext):
def get_or_head_response(self, req, resp_headers, resp_iter):
segments = self._get_manifest_read(resp_iter)
etag = md5()
content_length = 0
for seg_dict in segments:
if seg_dict.get('range'):
etag.update('%s:%s;' % (seg_dict['hash'], seg_dict['range']))
else:
etag.update(seg_dict['hash'])
slo_etag = None
content_length = None
response_headers = []
for header, value in resp_headers:
lheader = header.lower()
if lheader == SYSMETA_SLO_ETAG:
slo_etag = value
elif lheader == SYSMETA_SLO_SIZE:
content_length = value
elif lheader not in ('etag', 'content-length'):
response_headers.append((header, value))
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=self.slo.logger)
content_length += self._segment_length(seg_dict)
if slo_etag is None or content_length is None:
etag = md5()
content_length = 0
for seg_dict in segments:
if seg_dict.get('range'):
etag.update('%s:%s;' % (seg_dict['hash'],
seg_dict['range']))
else:
etag.update(seg_dict['hash'])
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=self.slo.logger)
content_length += self._segment_length(seg_dict)
slo_etag = etag.hexdigest()
response_headers = [(h, v) for h, v in resp_headers
if h.lower() not in ('etag', 'content-length')]
response_headers.append(('Content-Length', str(content_length)))
response_headers.append(('Etag', '"%s"' % etag.hexdigest()))
response_headers.append(('Etag', '"%s"' % slo_etag))
if req.method == 'HEAD':
return self._manifest_head_response(req, response_headers)
@ -942,7 +972,6 @@ class StaticLargeObject(object):
resp_body = get_response_body(
out_content_type, {}, problem_segments)
raise HTTPBadRequest(resp_body, content_type=out_content_type)
env = req.environ
slo_etag = md5()
for seg_data in data_for_storage:
@ -952,20 +981,33 @@ class StaticLargeObject(object):
else:
slo_etag.update(seg_data['hash'])
slo_etag = slo_etag.hexdigest()
req.headers.update({
SYSMETA_SLO_ETAG: slo_etag,
SYSMETA_SLO_SIZE: total_size,
'X-Static-Large-Object': 'True',
})
json_data = json.dumps(data_for_storage)
if six.PY3:
json_data = json_data.encode('utf-8')
req.body = json_data
env = req.environ
if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'
env['swift.content_type_overridden'] = True
env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size
env['HTTP_X_STATIC_LARGE_OBJECT'] = 'True'
json_data = json.dumps(data_for_storage)
if six.PY3:
json_data = json_data.encode('utf-8')
env['CONTENT_LENGTH'] = str(len(json_data))
env['wsgi.input'] = BytesIO(json_data)
slo_put_context = SloPutContext(self, slo_etag)
return slo_put_context.handle_slo_put(req, start_response)
def start_response_wrapper(status, headers, exc_info=None):
for i, (header, _value) in enumerate(headers):
if header.lower() == 'etag':
headers[i] = ('Etag', '"%s"' % slo_etag)
break
return start_response(status, headers, exc_info)
return self.app(env, start_response_wrapper)
def get_segments_to_delete_iter(self, req):
"""

View File

@ -154,15 +154,23 @@ class FakeSwift(object):
self._calls.append(
FakeSwiftCall(method, path, HeaderKeyDict(req.headers)))
backend_etag_header = req.headers.get('X-Backend-Etag-Is-At')
conditional_etag = None
if backend_etag_header and backend_etag_header in headers:
# Apply conditional etag overrides
conditional_etag = headers[backend_etag_header]
# range requests ought to work, hence conditional_response=True
if isinstance(body, list):
resp = resp_class(
req=req, headers=headers, app_iter=body,
conditional_response=req.method in ('GET', 'HEAD'))
conditional_response=req.method in ('GET', 'HEAD'),
conditional_etag=conditional_etag)
else:
resp = resp_class(
req=req, headers=headers, body=body,
conditional_response=req.method in ('GET', 'HEAD'))
conditional_response=req.method in ('GET', 'HEAD'),
conditional_etag=conditional_etag)
wsgi_iter = resp(env, start_response)
self.mark_opened(path)
return LeakTrackingIter(wsgi_iter, self, path)

View File

@ -414,7 +414,9 @@ class TestSloPutManifest(SloTestCase):
'/v1/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
self.assertNotIn('X-Static-Large-Object', req.headers)
for h in ('X-Static-Large-Object', 'X-Object-Sysmeta-Slo-Etag',
'X-Object-Sysmeta-Slo-Size'):
self.assertNotIn(h, req.headers)
def my_fake_start_response(*args, **kwargs):
gen_etag = '"' + md5hex('etagoftheobjectsegment') + '"'
@ -423,6 +425,11 @@ class TestSloPutManifest(SloTestCase):
self.slo(req.environ, my_fake_start_response)
self.assertIn('X-Static-Large-Object', req.headers)
self.assertEqual(req.headers['X-Static-Large-Object'], 'True')
self.assertIn('X-Object-Sysmeta-Slo-Etag', req.headers)
self.assertEqual(req.headers['X-Object-Sysmeta-Slo-Etag'],
md5hex('etagoftheobjectsegment'))
self.assertIn('X-Object-Sysmeta-Slo-Size', req.headers)
self.assertEqual(req.headers['X-Object-Sysmeta-Slo-Size'], '100')
self.assertIn('Content-Type', req.headers)
self.assertTrue(
req.headers['Content-Type'].endswith(';swift_bytes=100'),
@ -1078,11 +1085,11 @@ class TestSloDeleteManifest(SloTestCase):
'man-all-there?multipart-manifest=delete'))]))
class TestSloHeadManifest(SloTestCase):
class TestSloHeadOldManifest(SloTestCase):
slo_etag = md5hex("seg01-hashseg02-hash")
def setUp(self):
super(TestSloHeadManifest, self).setUp()
super(TestSloHeadOldManifest, self).setUp()
manifest_json = json.dumps([
{'name': '/gettest/seg01',
'bytes': '100',
@ -1100,6 +1107,8 @@ class TestSloHeadManifest(SloTestCase):
'X-Static-Large-Object': 'true',
'Etag': md5hex(manifest_json)}
manifest_headers.update(getattr(self, 'extra_manifest_headers', {}))
self.manifest_has_sysmeta = all(h in manifest_headers for h in (
'X-Object-Sysmeta-Slo-Etag', 'X-Object-Sysmeta-Slo-Size'))
self.app.register(
'GET', '/v1/AUTH_test/headtest/man',
swob.HTTPOk, manifest_headers, manifest_json)
@ -1116,9 +1125,9 @@ class TestSloHeadManifest(SloTestCase):
self.assertIn(('Content-Type', 'test/data'), headers)
self.assertEqual(body, '') # it's a HEAD request, after all
expected_app_calls = [
('HEAD', '/v1/AUTH_test/headtest/man'),
('GET', '/v1/AUTH_test/headtest/man')]
expected_app_calls = [('HEAD', '/v1/AUTH_test/headtest/man')]
if not self.manifest_has_sysmeta:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_none_match_etag_matching(self):
@ -1132,9 +1141,9 @@ class TestSloHeadManifest(SloTestCase):
self.assertIn(('Content-Length', '0'), headers)
self.assertIn(('Content-Type', 'test/data'), headers)
expected_app_calls = [
('HEAD', '/v1/AUTH_test/headtest/man'),
('GET', '/v1/AUTH_test/headtest/man')]
expected_app_calls = [('HEAD', '/v1/AUTH_test/headtest/man')]
if not self.manifest_has_sysmeta:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_match_etag_not_matching(self):
@ -1148,12 +1157,21 @@ class TestSloHeadManifest(SloTestCase):
self.assertIn(('Content-Length', '0'), headers)
self.assertIn(('Content-Type', 'test/data'), headers)
expected_app_calls = [
('HEAD', '/v1/AUTH_test/headtest/man'),
('GET', '/v1/AUTH_test/headtest/man')]
expected_app_calls = [('HEAD', '/v1/AUTH_test/headtest/man')]
if not self.manifest_has_sysmeta:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
class TestSloHeadManifest(TestSloHeadOldManifest):
def setUp(self):
self.extra_manifest_headers = {
'X-Object-Sysmeta-Slo-Etag': self.slo_etag,
'X-Object-Sysmeta-Slo-Size': '300',
}
super(TestSloHeadManifest, self).setUp()
class TestSloGetRawManifest(SloTestCase):
def setUp(self):
@ -2759,7 +2777,7 @@ class TestSloGetManifest(SloTestCase):
'ERROR: An error occurred while retrieving segments'))
class TestSloConditionalGetManifest(SloTestCase):
class TestSloConditionalGetOldManifest(SloTestCase):
slo_data = [
{'name': '/gettest/a_5', 'hash': md5hex("a" * 5),
'content_type': 'text/plain', 'bytes': '5'},
@ -2772,7 +2790,7 @@ class TestSloConditionalGetManifest(SloTestCase):
slo_etag = md5hex(''.join(seg['hash'] for seg in slo_data))
def setUp(self):
super(TestSloConditionalGetManifest, self).setUp()
super(TestSloConditionalGetOldManifest, self).setUp()
# some plain old objects
self.app.register(
@ -2816,6 +2834,8 @@ class TestSloConditionalGetManifest(SloTestCase):
'X-Static-Large-Object': 'true',
'Etag': md5hex(_abcd_manifest_json)}
manifest_headers.update(getattr(self, 'extra_manifest_headers', {}))
self.manifest_has_sysmeta = all(h in manifest_headers for h in (
'X-Object-Sysmeta-Slo-Etag', 'X-Object-Sysmeta-Slo-Size'))
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-abcd',
swob.HTTPOk, manifest_headers,
@ -2833,13 +2853,16 @@ class TestSloConditionalGetManifest(SloTestCase):
self.assertIn(('Etag', '"%s"' % self.slo_etag), headers)
self.assertEqual(body, '')
expected_app_calls = [
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
# Need to verify the first segment
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
]
expected_app_calls = [('GET', '/v1/AUTH_test/gettest/manifest-abcd')]
if not self.manifest_has_sysmeta:
# We *still* verify the first segment
expected_app_calls.extend([
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
def test_if_none_match_does_not_match(self):
req = Request.blank(
@ -2863,6 +2886,8 @@ class TestSloConditionalGetManifest(SloTestCase):
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get'),
]
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
def test_if_match_matches(self):
req = Request.blank(
@ -2877,17 +2902,21 @@ class TestSloConditionalGetManifest(SloTestCase):
self.assertEqual(
body, 'aaaaabbbbbbbbbbcccccccccccccccdddddddddddddddddddd')
expected_app_calls = [
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
expected_app_calls = [('GET', '/v1/AUTH_test/gettest/manifest-abcd')]
if not self.manifest_has_sysmeta:
# Manifest never matches -> got back a 412; need to re-fetch
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
expected_app_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-abcd'))
expected_app_calls.extend([
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/b_10?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/c_15?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get'),
]
])
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
def test_if_match_does_not_match(self):
req = Request.blank(
@ -2901,15 +2930,18 @@ class TestSloConditionalGetManifest(SloTestCase):
self.assertIn(('Etag', '"%s"' % self.slo_etag), headers)
self.assertEqual(body, '')
expected_app_calls = [
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
# Manifest never matches -> got back a 412; need to re-fetch
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
# We need to verify the first segment
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
]
expected_app_calls = [('GET', '/v1/AUTH_test/gettest/manifest-abcd')]
if not self.manifest_has_sysmeta:
# We *still* verify the first segment
expected_app_calls.extend([
# Manifest never matches -> got back a 412; need to re-fetch
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
def test_if_match_matches_and_range(self):
req = Request.blank(
@ -2935,6 +2967,47 @@ class TestSloConditionalGetManifest(SloTestCase):
('GET', '/v1/AUTH_test/gettest/b_10?multipart-manifest=get'),
]
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
def test_if_match_matches_passthrough(self):
# first fetch and stash the manifest etag
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd?multipart-manifest=get',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK')
headers = HeaderKeyDict(headers)
self.assertEqual('application/json; charset=utf-8',
headers['Content-Type'])
manifest_etag = headers['Etag']
# now use it as a condition and expect to match
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd?multipart-manifest=get',
environ={'REQUEST_METHOD': 'GET'},
headers={'If-Match': manifest_etag})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK')
headers = HeaderKeyDict(headers)
self.assertEqual(manifest_etag, headers['Etag'])
expected_app_calls = [
('GET',
'/v1/AUTH_test/gettest/manifest-abcd?multipart-manifest=get')] * 2
self.assertEqual(self.app.calls, expected_app_calls)
self.assertNotIn('X-Backend-Etag-Is-At', self.app.headers[0])
self.assertNotIn('X-Backend-Etag-Is-At', self.app.headers[1])
class TestSloConditionalGetNewManifest(TestSloConditionalGetOldManifest):
def setUp(self):
self.extra_manifest_headers = {
'X-Object-Sysmeta-Slo-Etag': self.slo_etag,
'X-Object-Sysmeta-Slo-Size': '50',
}
super(TestSloConditionalGetNewManifest, self).setUp()
class TestSloBulkLogger(unittest.TestCase):