Let clients request heartbeats during SLO PUTs

An SLO PUT requires that we HEAD every referenced object; as a result, it
can be a very time-intensive operation. This makes it difficult as a
client to differentiate between a proxy-server that's still doing work and
one that's crashed but left the socket open.

Now, clients can opt-in to receiving heartbeats during long-running PUTs
by including the query parameter

    heartbeat=on

With heartbeating turned on, the proxy will start its response immediately
with 202 Accepted then send a single whitespace character periodically
until the request completes. At that point, a final summary chunk will be
sent which includes a "Response Status" key indicating success or failure
and (if successful) an "Etag" key indicating the Etag of the resulting SLO.

This mechanism is very similar to the way bulk extractions and deletions
work, and even the way SLO behaves for ?multipart-manifest=delete requests.

Note that this is opt-in: this prevents us from sending the 202 response
to existing clients that may mis-interpret it as an immediate indication
of success.

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Related-Bug: 1718811
Change-Id: I65cee5f629c87364e188aa05a06d563c3849c8f3
This commit is contained in:
Tim Burke 2017-10-03 00:11:07 +00:00 committed by Alistair Coles
parent 29e9ae1cc5
commit 77a8a4455d
6 changed files with 525 additions and 66 deletions

View File

@ -51,7 +51,7 @@ To create a static large object, divide your content into pieces and
create (upload) a segment object to contain each piece.
Create a manifest object. Include the ``multipart-manifest=put``
query string at the end of the manifest object name to indicate that
query parameter at the end of the manifest object name to indicate that
this is a manifest object.
The body of the **PUT** request on the manifest object comprises a json
@ -113,9 +113,22 @@ of the concatenated ``ETag`` values of the object segments. You can also
set the ``Content-Type`` request header and custom object metadata.
When the **PUT** operation sees the ``multipart-manifest=put`` query
string, it reads the request body and verifies that each segment
parameter, it reads the request body and verifies that each segment
object exists and that the sizes and ETags match. If there is a
mismatch, the **PUT**\ operation fails.
mismatch, the **PUT** operation fails.
This verification process can take a long time to complete, particularly
as the number of segments increases. You may include a ``heartbeat=on``
query parameter to have the server:
1. send a ``202 Accepted`` response before it begins validating segments,
2. periodically send whitespace characters to keep the connection alive, and
3. send a final response code in the body.
.. note::
The server may still immediately respond with ``400 Bad Request``
if it can determine that the request is invalid before making
backend requests.
If everything matches, the manifest object is created. The
``X-Static-Large-Object`` metadata is set to ``true`` indicating that
@ -124,18 +137,18 @@ this is a static object manifest.
Normally when you perform a **GET** operation on the manifest object,
the response body contains the concatenated content of the segment
objects. To download the manifest list, use the
``multipart-manifest=get`` query string. The resulting list is not
``multipart-manifest=get`` query parameter. The resulting list is not
formatted the same as the manifest you originally used in the **PUT**
operation.
If you use the **DELETE** operation on a manifest object, the manifest
object is deleted. The segment objects are not affected. However, if you
add the ``multipart-manifest=delete`` query string, the segment
add the ``multipart-manifest=delete`` query parameter, the segment
objects are deleted and if all are successfully deleted, the manifest
object is also deleted.
To change the manifest, use a **PUT** operation with the
``multipart-manifest=put`` query string. This request creates a
``multipart-manifest=put`` query parameter. This request creates a
manifest object. You can also update the object metadata in the usual
way.
@ -326,7 +339,7 @@ a manifest object but a normal object with content same as what you would
get on a **GET** request to the original manifest object.
To copy the manifest object, you include the ``multipart-manifest=get``
query string in the **COPY** request. The new object contains the same
query parameter in the **COPY** request. The new object contains the same
manifest as the original. The segment objects are not copied. Instead,
both the original and new manifest objects share the same set of segment
objects.

View File

@ -766,6 +766,11 @@ use = egg:swift#slo
# Default is to use the concurrency value from above; all of the same caveats
# apply regarding recommended ranges.
# delete_concurrency = 2
#
# In order to keep a connection active during a potentially long PUT request,
# clients may request that Swift send whitespace ahead of the final response
# body. This whitespace will be yielded at most every yield_frequency seconds.
# yield_frequency = 10
# Note: Put after auth and staticweb in the pipeline.
# If you don't put it in the pipeline, it will be inserted for you.

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
r"""
Middleware that will provide Static Large Object (SLO) support.
This feature is very similar to Dynamic Large Object (DLO) support in that
@ -72,6 +72,33 @@ found, size/etag mismatch, below minimum size, invalid range) then the user
will receive a 4xx error response. If everything does match, the user will
receive a 2xx response and the SLO object is ready for downloading.
Note that large manifests may take a long time to verify; historically,
clients would need to use a long read timeout for the connection to give
Swift enough time to send a final ``201 Created`` or ``400 Bad Request``
response. Now, clients should use the query parameters::
?multipart-manifest=put&heartbeat=on
to request that Swift send an immediate ``202 Accepted`` response and periodic
whitespace to keep the connection alive. A final response code will appear in
the body. The format of the response body defaults to text/plain but can be
either json or xml depending on the ``Accept`` header. An example body is as
follows::
Response Status: 201 Created
Response Body:
Etag: "8f481cede6d2ddc07cb36aa084d9a64d"
Last Modified: Wed, 25 Oct 2017 17:08:55 GMT
Errors:
Or, as a json response::
{"Response Status": "201 Created",
"Response Body": "",
"Etag": "\"8f481cede6d2ddc07cb36aa084d9a64d\"",
"Last Modified": "Wed, 25 Oct 2017 17:08:55 GMT",
"Errors": []}
Behind the scenes, on success, a JSON manifest generated from the user input is
sent to object servers with an extra ``X-Static-Large-Object: True`` header
and a modified ``Content-Type``. The items in this manifest will include the
@ -251,12 +278,14 @@ import json
import mimetypes
import re
import six
import time
from hashlib import md5
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \
HTTPUnauthorized, HTTPConflict, HTTPUnprocessableEntity, Response, Range
HTTPUnauthorized, HTTPConflict, HTTPUnprocessableEntity, Response, Range, \
RESPONSE_REASONS
from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
@ -273,6 +302,7 @@ from swift.common.middleware.bulk import get_response_body, \
DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 * 1024 # 1 MiB
DEFAULT_MAX_MANIFEST_SEGMENTS = 1000
DEFAULT_MAX_MANIFEST_SIZE = 1024 * 1024 * 2 # 2 MiB
DEFAULT_YIELD_FREQUENCY = 10
REQUIRED_SLO_KEYS = set(['path'])
@ -860,16 +890,26 @@ class StaticLargeObject(object):
:param app: The next WSGI filter or app in the paste.deploy chain.
:param conf: The configuration dict for the middleware.
:param max_manifest_segments: The maximum number of segments allowed in
newly-created static large objects.
:param max_manifest_size: The maximum size (in bytes) of newly-created
static-large-object manifests.
:param yield_frequency: If the client included ``heartbeat=on`` in the
query parameters when creating a new static large
object, the period of time to wait between sending
whitespace to keep the connection alive.
"""
def __init__(self, app, conf,
max_manifest_segments=DEFAULT_MAX_MANIFEST_SEGMENTS,
max_manifest_size=DEFAULT_MAX_MANIFEST_SIZE):
max_manifest_size=DEFAULT_MAX_MANIFEST_SIZE,
yield_frequency=DEFAULT_YIELD_FREQUENCY):
self.conf = conf
self.app = app
self.logger = get_logger(conf, log_route='slo')
self.max_manifest_segments = max_manifest_segments
self.max_manifest_size = max_manifest_size
self.yield_frequency = yield_frequency
self.max_get_time = int(self.conf.get('max_get_time', 86400))
self.rate_limit_under_size = int(self.conf.get(
'rate_limit_under_size', DEFAULT_RATE_LIMIT_UNDER_SIZE))
@ -928,7 +968,6 @@ class StaticLargeObject(object):
raise HTTPRequestEntityTooLarge(
'Number of segments must be <= %d' %
self.max_manifest_segments)
total_size = 0
try:
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
except ValueError:
@ -952,6 +991,7 @@ class StaticLargeObject(object):
return obj_name, sub_req.get_response(self)
def validate_seg_dict(seg_dict, head_seg_resp, allow_empty_segment):
obj_name = seg_dict['path']
if not head_seg_resp.is_success:
problem_segments.append([quote(obj_name),
head_seg_resp.status])
@ -1009,61 +1049,115 @@ class StaticLargeObject(object):
seg_data['sub_slo'] = True
return segment_length, seg_data
heartbeat = config_true_value(req.params.get('heartbeat'))
separator = ''
if heartbeat:
# Apparently some ways of deploying require that this to happens
# *before* the return? Not sure why.
req.environ['eventlet.minimum_write_chunk_size'] = 0
start_response('202 Accepted', [ # NB: not 201 !
('Content-Type', out_content_type),
])
separator = '\r\n\r\n'
data_for_storage = [None] * len(parsed_data)
with StreamingPile(self.concurrency) as pile:
for obj_name, resp in pile.asyncstarmap(do_head, (
(path, ) for path in path2indices)):
for i in path2indices[obj_name]:
segment_length, seg_data = validate_seg_dict(
parsed_data[i], resp,
allow_empty_segment=(i == len(parsed_data) - 1))
data_for_storage[i] = seg_data
total_size += segment_length
if problem_segments:
resp_body = get_response_body(
out_content_type, {}, problem_segments, 'upload')
raise HTTPBadRequest(resp_body, content_type=out_content_type)
def resp_iter():
total_size = 0
# wsgi won't propagate start_response calls until some data has
# been yielded so make sure first heartbeat is sent immediately
if heartbeat:
yield ' '
last_yield_time = time.time()
with StreamingPile(self.concurrency) as pile:
for obj_name, resp in pile.asyncstarmap(do_head, (
(path, ) for path in path2indices)):
now = time.time()
if heartbeat and (now - last_yield_time >
self.yield_frequency):
# Make sure we've called start_response before
# sending data
yield ' '
last_yield_time = now
for i in path2indices[obj_name]:
segment_length, seg_data = validate_seg_dict(
parsed_data[i], resp,
allow_empty_segment=(i == len(parsed_data) - 1))
data_for_storage[i] = seg_data
total_size += segment_length
slo_etag = md5()
for seg_data in data_for_storage:
if seg_data.get('range'):
slo_etag.update('%s:%s;' % (seg_data['hash'],
seg_data['range']))
if problem_segments:
err = HTTPBadRequest(content_type=out_content_type)
resp_dict = {}
if heartbeat:
resp_dict['Response Status'] = err.status
resp_dict['Response Body'] = err.body or '\n'.join(
RESPONSE_REASONS.get(err.status_int, ['']))
else:
start_response(err.status,
[(h, v) for h, v in err.headers.items()
if h.lower() != 'content-length'])
yield separator + get_response_body(
out_content_type, resp_dict, problem_segments, 'upload')
return
slo_etag = md5()
for seg_data in data_for_storage:
if seg_data.get('range'):
slo_etag.update('%s:%s;' % (seg_data['hash'],
seg_data['range']))
else:
slo_etag.update(seg_data['hash'])
slo_etag = slo_etag.hexdigest()
client_etag = req.headers.get('Etag')
if client_etag and client_etag.strip('"') != slo_etag:
err = HTTPUnprocessableEntity(request=req)
if heartbeat:
yield separator + get_response_body(out_content_type, {
'Response Status': err.status,
'Response Body': err.body or '\n'.join(
RESPONSE_REASONS.get(err.status_int, [''])),
}, problem_segments, 'upload')
else:
for chunk in err(req.environ, start_response):
yield chunk
return
json_data = json.dumps(data_for_storage)
if six.PY3:
json_data = json_data.encode('utf-8')
req.body = json_data
req.headers.update({
SYSMETA_SLO_ETAG: slo_etag,
SYSMETA_SLO_SIZE: total_size,
'X-Static-Large-Object': 'True',
'Etag': md5(json_data).hexdigest(),
})
env = req.environ
if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = (guessed_type or
'application/octet-stream')
env['swift.content_type_overridden'] = True
env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size
resp = req.get_response(self.app)
resp_dict = {'Response Status': resp.status}
if resp.is_success:
resp.etag = slo_etag
resp_dict['Etag'] = resp.headers['Etag']
resp_dict['Last Modified'] = resp.headers['Last-Modified']
if heartbeat:
resp_dict['Response Body'] = resp.body
yield separator + get_response_body(
out_content_type, resp_dict, [], 'upload')
else:
slo_etag.update(seg_data['hash'])
for chunk in resp(req.environ, start_response):
yield chunk
slo_etag = slo_etag.hexdigest()
client_etag = req.headers.get('Etag')
if client_etag and client_etag.strip('"') != slo_etag:
raise HTTPUnprocessableEntity(request=req)
json_data = json.dumps(data_for_storage)
if six.PY3:
json_data = json_data.encode('utf-8')
req.body = json_data
req.headers.update({
SYSMETA_SLO_ETAG: slo_etag,
SYSMETA_SLO_SIZE: total_size,
'X-Static-Large-Object': 'True',
'Etag': md5(json_data).hexdigest(),
})
env = req.environ
if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'
env['swift.content_type_overridden'] = True
env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size
def start_response_wrapper(status, headers, exc_info=None):
for i, (header, _value) in enumerate(headers):
if header.lower() == 'etag':
headers[i] = ('Etag', '"%s"' % slo_etag)
break
return start_response(status, headers, exc_info)
return self.app(env, start_response_wrapper)
return resp_iter()
def get_segments_to_delete_iter(self, req):
"""
@ -1212,10 +1306,13 @@ def filter_factory(global_conf, **local_conf):
DEFAULT_MAX_MANIFEST_SEGMENTS))
max_manifest_size = int(conf.get('max_manifest_size',
DEFAULT_MAX_MANIFEST_SIZE))
yield_frequency = int(conf.get('yield_frequency',
DEFAULT_YIELD_FREQUENCY))
register_swift_info('slo',
max_manifest_segments=max_manifest_segments,
max_manifest_size=max_manifest_size,
yield_frequency=yield_frequency,
# this used to be configurable; report it as 1 for
# clients that might still care
min_segment_size=1)
@ -1224,5 +1321,6 @@ def filter_factory(global_conf, **local_conf):
return StaticLargeObject(
app, conf,
max_manifest_segments=max_manifest_segments,
max_manifest_size=max_manifest_size)
max_manifest_size=max_manifest_size,
yield_frequency=yield_frequency)
return slo_filter

View File

@ -347,6 +347,8 @@ class Connection(object):
self.connection.send('0\r\n\r\n')
self.response = self.connection.getresponse()
# Hope it isn't big!
self.response.body = self.response.read()
self.connection.close()
return self.response.status

View File

@ -792,6 +792,101 @@ class TestSlo(Base):
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
def test_slo_put_heartbeating(self):
if 'yield_frequency' not in cluster_info['slo']:
# old swift?
raise SkipTest('Swift does not seem to support heartbeating')
def do_put(headers=None, include_error=False):
file_item = self.env.container.file("manifest-heartbeat")
seg_info = self.env.seg_info
manifest_data = [seg_info['seg_a'], seg_info['seg_b'],
seg_info['seg_c'], seg_info['seg_d'],
seg_info['seg_e']]
if include_error:
manifest_data.append({'path': 'non-existent/segment'})
resp = file_item.write(
json.dumps(manifest_data),
parms={'multipart-manifest': 'put', 'heartbeat': 'on'},
hdrs=headers, return_resp=True)
self.assertEqual(resp.status, 202)
self.assertTrue(resp.chunked)
body_lines = resp.body.split('\n', 2)
self.assertFalse(body_lines[0].strip()) # all whitespace
self.assertEqual('\r', body_lines[1])
return body_lines[2]
body_lines = do_put().split('\n')
self.assertIn('Response Status: 201 Created', body_lines)
self.assertIn('Etag', [line.split(':', 1)[0] for line in body_lines])
self.assertIn('Last Modified', [line.split(':', 1)[0]
for line in body_lines])
body_lines = do_put({'Accept': 'text/plain'}).split('\n')
self.assertIn('Response Status: 201 Created', body_lines)
self.assertIn('Etag', [line.split(':', 1)[0] for line in body_lines])
self.assertIn('Last Modified', [line.split(':', 1)[0]
for line in body_lines])
body = do_put({'Accept': 'application/json'})
try:
resp = json.loads(body)
except ValueError:
self.fail('Expected JSON, got %r' % body)
self.assertIn('Etag', resp)
del resp['Etag']
self.assertIn('Last Modified', resp)
del resp['Last Modified']
self.assertEqual(resp, {
'Response Status': '201 Created',
'Response Body': '',
'Errors': [],
})
body_lines = do_put(include_error=True).split('\n')
self.assertIn('Response Status: 400 Bad Request', body_lines)
self.assertIn('Response Body: Bad Request', body_lines)
self.assertNotIn('Etag', [line.split(':', 1)[0]
for line in body_lines])
self.assertNotIn('Last Modified', [line.split(':', 1)[0]
for line in body_lines])
self.assertEqual(body_lines[-3:], [
'Errors:',
'non-existent/segment, 404 Not Found',
'',
])
body = do_put({'Accept': 'application/json'}, include_error=True)
try:
resp = json.loads(body)
except ValueError:
self.fail('Expected JSON, got %r' % body)
self.assertNotIn('Etag', resp)
self.assertNotIn('Last Modified', resp)
self.assertEqual(resp, {
'Response Status': '400 Bad Request',
'Response Body': 'Bad Request\nThe server could not comply with '
'the request since it is either malformed or '
'otherwise incorrect.',
'Errors': [
['non-existent/segment', '404 Not Found'],
],
})
body = do_put({'Accept': 'application/json', 'ETag': 'bad etag'})
try:
resp = json.loads(body)
except ValueError:
self.fail('Expected JSON, got %r' % body)
self.assertNotIn('Etag', resp)
self.assertNotIn('Last Modified', resp)
self.assertEqual(resp, {
'Response Status': '422 Unprocessable Entity',
'Response Body': 'Unprocessable Entity\nUnable to process the '
'contained instructions',
'Errors': [],
})
def _make_manifest(self):
file_item = self.env.container.file("manifest-post")
seg_info = self.env.seg_info

View File

@ -324,6 +324,9 @@ class TestSloPutManifest(SloTestCase):
self.app.register(
'PUT', '/', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/AUTH_test/cont/missing-object',
swob.HTTPNotFound, {}, None)
self.app.register(
'HEAD', '/v1/AUTH_test/cont/object',
swob.HTTPOk,
@ -355,7 +358,8 @@ class TestSloPutManifest(SloTestCase):
{'Content-Length': '1', 'Etag': 'a'},
None)
self.app.register(
'PUT', '/v1/AUTH_test/c/man', swob.HTTPCreated, {}, None)
'PUT', '/v1/AUTH_test/c/man', swob.HTTPCreated,
{'Last-Modified': 'Fri, 01 Feb 2012 20:38:36 GMT'}, None)
self.app.register(
'DELETE', '/v1/AUTH_test/c/man', swob.HTTPNoContent, {}, None)
@ -444,6 +448,219 @@ class TestSloPutManifest(SloTestCase):
'Content-Type %r does not end with swift_bytes=100' %
req.headers['Content-Type'])
@patch('swift.common.middleware.slo.time')
def test_handle_multipart_put_fast_heartbeat(self, mock_time):
mock_time.time.side_effect = [
0, # start time
1, # first segment's fast
2, # second segment's also fast!
]
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual('202 Accepted', status)
headers_found = [h.lower() for h, v in headers]
self.assertNotIn('etag', headers_found)
body = ''.join(body)
gen_etag = '"' + md5hex('etagoftheobjectsegment' * 2) + '"'
self.assertTrue(body.startswith(' \r\n\r\n'),
'Expected body to start with single space and two '
'blank lines; got %r' % body)
self.assertIn('\nResponse Status: 201 Created\n', body)
self.assertIn('\nResponse Body: \n', body)
self.assertIn('\nEtag: %s\n' % gen_etag, body)
self.assertIn('\nLast Modified: Fri, 01 Feb 2012 20:38:36 GMT\n', body)
@patch('swift.common.middleware.slo.time')
def test_handle_multipart_long_running_put_success(self, mock_time):
mock_time.time.side_effect = [
0, # start time
1, # first segment's fast
20, # second segment's slow
]
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual('202 Accepted', status)
headers_found = [h.lower() for h, v in headers]
self.assertNotIn('etag', headers_found)
body = ''.join(body)
gen_etag = '"' + md5hex('etagoftheobjectsegment' * 2) + '"'
self.assertTrue(body.startswith(' \r\n\r\n'),
'Expected body to start with two spaces and two '
'blank lines; got %r' % body)
self.assertIn('\nResponse Status: 201 Created\n', body)
self.assertIn('\nResponse Body: \n', body)
self.assertIn('\nEtag: %s\n' % gen_etag, body)
self.assertIn('\nLast Modified: Fri, 01 Feb 2012 20:38:36 GMT\n', body)
@patch('swift.common.middleware.slo.time')
def test_handle_multipart_long_running_put_success_json(self, mock_time):
mock_time.time.side_effect = [
0, # start time
11, # first segment's slow
22, # second segment's also slow
]
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Accept': 'application/json'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual('202 Accepted', status)
headers_found = [h.lower() for h, v in headers]
self.assertNotIn('etag', headers_found)
body = ''.join(body)
gen_etag = '"' + md5hex('etagoftheobjectsegment' * 2) + '"'
self.assertTrue(body.startswith(' \r\n\r\n'),
'Expected body to start with three spaces and two '
'blank lines; got %r' % body)
body = json.loads(body)
self.assertEqual(body['Response Status'], '201 Created')
self.assertEqual(body['Response Body'], '')
self.assertEqual(body['Etag'], gen_etag)
self.assertEqual(body['Last Modified'],
'Fri, 01 Feb 2012 20:38:36 GMT')
@patch('swift.common.middleware.slo.time')
def test_handle_multipart_long_running_put_failure(self, mock_time):
mock_time.time.side_effect = [
0, # start time
1, # first segment's fast
20, # second segment's slow
]
test_json_data = json.dumps([{'path': u'/cont/missing-object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 99}])
req = Request.blank(
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual('202 Accepted', status)
headers_found = [h.lower() for h, v in headers]
self.assertNotIn('etag', headers_found)
body = ''.join(body).split('\n')
self.assertEqual([' \r', '\r'], body[:2],
'Expected body to start with two spaces and two '
'blank lines; got %r' % '\n'.join(body))
self.assertIn('Response Status: 400 Bad Request', body[2:5])
self.assertIn('Response Body: Bad Request', body)
self.assertIn('The server could not comply with the request since it '
'is either malformed or otherwise incorrect.', body)
self.assertFalse(any(line.startswith('Etag: ') for line in body))
self.assertFalse(any(line.startswith('Last Modified: ')
for line in body))
self.assertEqual(body[-4], 'Errors:')
self.assertEqual(sorted(body[-3:-1]), [
'/cont/missing-object, 404 Not Found',
'/cont/object, Size Mismatch',
])
self.assertEqual(body[-1], '')
@patch('swift.common.middleware.slo.time')
def test_handle_multipart_long_running_put_failure_json(self, mock_time):
mock_time.time.side_effect = [
0, # start time
11, # first segment's slow
22, # second segment's also slow
]
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobjectsegment',
'size_bytes': 99},
{'path': '/cont/object',
'etag': 'some other etag',
'size_bytes': 100}])
req = Request.blank(
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Accept': 'application/json'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual('202 Accepted', status)
headers_found = [h.lower() for h, v in headers]
self.assertNotIn('etag', headers_found)
body = ''.join(body)
self.assertTrue(body.startswith(' \r\n\r\n'),
'Expected body to start with three spaces and two '
'blank lines; got %r' % body)
body = json.loads(body)
self.assertEqual(body['Response Status'], '400 Bad Request')
self.assertEqual(body['Response Body'], 'Bad Request\nThe server '
'could not comply with the request since it is '
'either malformed or otherwise incorrect.')
self.assertNotIn('Etag', body)
self.assertNotIn('Last Modified', body)
self.assertEqual(sorted(body['Errors']), [
['/cont/object', 'Etag Mismatch'],
[quote(u'/cont/object\u2661'.encode('utf8')), 'Size Mismatch'],
])
@patch('swift.common.middleware.slo.time')
def test_handle_multipart_long_running_put_bad_etag_json(self, mock_time):
mock_time.time.side_effect = [
0, # start time
11, # first segment's slow
22, # second segment's also slow
]
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Accept': 'application/json', 'ETag': 'bad etag'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual('202 Accepted', status)
headers_found = [h.lower() for h, v in headers]
self.assertNotIn('etag', headers_found)
body = ''.join(body)
self.assertTrue(body.startswith(' \r\n\r\n'),
'Expected body to start with three spaces and two '
'blank lines; got %r' % body)
body = json.loads(body)
self.assertEqual(body['Response Status'], '422 Unprocessable Entity')
self.assertEqual('Unprocessable Entity\nUnable to process the '
'contained instructions', body['Response Body'])
self.assertNotIn('Etag', body)
self.assertNotIn('Last Modified', body)
self.assertEqual(body['Errors'], [])
def test_manifest_put_no_etag_success(self):
req = Request.blank(
'/v1/AUTH_test/c/man?multipart-manifest=put',
@ -476,10 +693,10 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual(resp.status_int, 422)
def test_handle_multipart_put_disallow_empty_first_segment(self):
test_json_data = json.dumps([{'path': '/cont/object',
test_json_data = json.dumps([{'path': '/cont/small_object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 0},
{'path': '/cont/small_object',
{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank('/v1/a/c/o?multipart-manifest=put',
@ -3093,6 +3310,35 @@ class TestSwiftInfo(unittest.TestCase):
self.assertEqual(swift_info['slo'].get('min_segment_size'), 1)
self.assertEqual(swift_info['slo'].get('max_manifest_size'),
mware.max_manifest_size)
self.assertEqual(1000, mware.max_manifest_segments)
self.assertEqual(2097152, mware.max_manifest_size)
self.assertEqual(1048576, mware.rate_limit_under_size)
self.assertEqual(10, mware.rate_limit_after_segment)
self.assertEqual(1, mware.rate_limit_segments_per_sec)
self.assertEqual(10, mware.yield_frequency)
self.assertEqual(2, mware.concurrency)
self.assertEqual(2, mware.bulk_deleter.delete_concurrency)
def test_registered_non_defaults(self):
conf = dict(
max_manifest_segments=500, max_manifest_size=1048576,
rate_limit_under_size=2097152, rate_limit_after_segment=20,
rate_limit_segments_per_sec=2, yield_frequency=5, concurrency=1,
delete_concurrency=3)
mware = slo.filter_factory(conf)('have to pass in an app')
swift_info = utils.get_swift_info()
self.assertTrue('slo' in swift_info)
self.assertEqual(swift_info['slo'].get('max_manifest_segments'), 500)
self.assertEqual(swift_info['slo'].get('min_segment_size'), 1)
self.assertEqual(swift_info['slo'].get('max_manifest_size'), 1048576)
self.assertEqual(500, mware.max_manifest_segments)
self.assertEqual(1048576, mware.max_manifest_size)
self.assertEqual(2097152, mware.rate_limit_under_size)
self.assertEqual(20, mware.rate_limit_after_segment)
self.assertEqual(2, mware.rate_limit_segments_per_sec)
self.assertEqual(5, mware.yield_frequency)
self.assertEqual(1, mware.concurrency)
self.assertEqual(3, mware.bulk_deleter.delete_concurrency)
if __name__ == '__main__':
unittest.main()