Implement heartbeat response for COPY request
This will prevent client timeouts during large object copies, similar to existing support in bulk operations and SLO manifest writes. The operator can set the frequency using the yield_frequency inside conf file. To use heartbeat, simply add "heartbeat=on" query parameter, which will: - Returns a 202 Accepted response - Sends periodic 1 byte whitespace heartbeat every yield_frequency - Returns final status and headers - Supports both plain text and JSON Closes-Bug: #1738306 Change-Id: Iaf4355f2e0edbd6dde226231f3d7a61a5e53ecfa
This commit is contained in:
@@ -194,9 +194,8 @@ swift.source set and the content length will reflect the size of the
|
||||
payload sent to the proxy (the list of objects/containers to be deleted).
|
||||
"""
|
||||
|
||||
import json
|
||||
from swift.common.request_helpers import get_heartbeat_response_body
|
||||
import tarfile
|
||||
from xml.sax.saxutils import escape # nosec B406
|
||||
from time import time
|
||||
from eventlet import sleep
|
||||
import zlib
|
||||
@@ -224,50 +223,6 @@ ACCEPTABLE_FORMATS = ['text/plain', 'application/json', 'application/xml',
|
||||
'text/xml']
|
||||
|
||||
|
||||
def get_response_body(data_format, data_dict, error_list, root_tag):
|
||||
"""
|
||||
Returns a properly formatted response body according to format.
|
||||
|
||||
Handles json and xml, otherwise will return text/plain.
|
||||
Note: xml response does not include xml declaration.
|
||||
|
||||
:params data_format: resulting format
|
||||
:params data_dict: generated data about results.
|
||||
:params error_list: list of quoted filenames that failed
|
||||
:params root_tag: the tag name to use for root elements when returning XML;
|
||||
e.g. 'extract' or 'delete'
|
||||
"""
|
||||
if data_format == 'application/json':
|
||||
data_dict['Errors'] = error_list
|
||||
return json.dumps(data_dict).encode('ascii')
|
||||
if data_format and data_format.endswith('/xml'):
|
||||
output = ['<', root_tag, '>\n']
|
||||
for key in sorted(data_dict):
|
||||
xml_key = key.replace(' ', '_').lower()
|
||||
output.extend([
|
||||
'<', xml_key, '>',
|
||||
escape(str(data_dict[key])),
|
||||
'</', xml_key, '>\n',
|
||||
])
|
||||
output.append('<errors>\n')
|
||||
for name, status in error_list:
|
||||
output.extend([
|
||||
'<object><name>', escape(name), '</name><status>',
|
||||
escape(status), '</status></object>\n',
|
||||
])
|
||||
output.extend(['</errors>\n</', root_tag, '>\n'])
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
||||
output = []
|
||||
for key in sorted(data_dict):
|
||||
output.append('%s: %s\n' % (key, data_dict[key]))
|
||||
output.append('Errors:\n')
|
||||
output.extend(
|
||||
'%s, %s\n' % (name, status)
|
||||
for name, status in error_list)
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
||||
|
||||
def pax_key_to_swift_header(pax_key):
|
||||
if (pax_key == u"SCHILY.xattr.user.mime_type" or
|
||||
pax_key == u"LIBARCHIVE.xattr.user.mime_type"):
|
||||
@@ -506,8 +461,9 @@ class Bulk(object):
|
||||
self.logger.exception('Error in bulk delete.')
|
||||
resp_dict['Response Status'] = HTTPServerError().status
|
||||
|
||||
yield separator + get_response_body(out_content_type,
|
||||
resp_dict, failed_files, 'delete')
|
||||
yield separator + get_heartbeat_response_body(out_content_type,
|
||||
resp_dict, failed_files,
|
||||
'delete')
|
||||
|
||||
def handle_extract_iter(self, req, compress_type,
|
||||
out_content_type='text/plain'):
|
||||
@@ -671,7 +627,7 @@ class Bulk(object):
|
||||
self.logger.exception('Error in extract archive.')
|
||||
resp_dict['Response Status'] = HTTPServerError().status
|
||||
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, failed_files, 'extract')
|
||||
|
||||
def _process_delete(self, resp, pile, obj_name, resp_dict,
|
||||
|
@@ -125,6 +125,8 @@ from swift.common.request_helpers import copy_header_subset, remove_items, \
|
||||
is_sys_meta, is_sys_or_user_meta, is_object_transient_sysmeta, \
|
||||
check_path_header, OBJECT_SYSMETA_CONTAINER_UPDATE_OVERRIDE_PREFIX
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest
|
||||
import eventlet
|
||||
from swift.common.request_helpers import get_heartbeat_response_body
|
||||
|
||||
|
||||
def _check_copy_from_header(req):
|
||||
@@ -175,10 +177,11 @@ def _copy_headers(src, dest):
|
||||
|
||||
class ServerSideCopyWebContext(WSGIContext):
|
||||
|
||||
def __init__(self, app, logger):
|
||||
def __init__(self, app, logger, yield_frequency=10):
|
||||
super(ServerSideCopyWebContext, self).__init__(app)
|
||||
self.app = app
|
||||
self.logger = logger
|
||||
self.yield_frequency = yield_frequency
|
||||
|
||||
def get_source_resp(self, req):
|
||||
sub_req = make_subrequest(
|
||||
@@ -187,12 +190,73 @@ class ServerSideCopyWebContext(WSGIContext):
|
||||
return sub_req.get_response(self.app)
|
||||
|
||||
def send_put_req(self, req, additional_resp_headers, start_response):
|
||||
app_resp = self._app_call(req.environ)
|
||||
self._adjust_put_response(req, additional_resp_headers)
|
||||
start_response(self._response_status,
|
||||
self._response_headers,
|
||||
self._response_exc_info)
|
||||
return app_resp
|
||||
heartbeat = config_true_value(req.params.get('heartbeat'))
|
||||
ACCEPTABLE_FORMATS = ['text/plain', 'application/json']
|
||||
|
||||
try:
|
||||
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
|
||||
except ValueError:
|
||||
out_content_type = 'text/plain'
|
||||
if not out_content_type:
|
||||
out_content_type = 'text/plain'
|
||||
|
||||
if heartbeat:
|
||||
gt = eventlet.spawn(self._app_call,
|
||||
req.environ)
|
||||
start_response('202 Accepted',
|
||||
[('Content-Type', out_content_type)])
|
||||
|
||||
def resp_iter():
|
||||
# Send an initial heartbeat
|
||||
yield b' '
|
||||
app_iter = [b'']
|
||||
try:
|
||||
while not gt.dead:
|
||||
try:
|
||||
with eventlet.Timeout(self.yield_frequency):
|
||||
app_iter = gt.wait()
|
||||
except eventlet.Timeout:
|
||||
yield b' '
|
||||
except Exception as e:
|
||||
# Send back the status to the client if error
|
||||
self._response_status = '500 Internal Error'
|
||||
app_iter = [str(e).encode('utf8')]
|
||||
finally:
|
||||
response_body = b''.join(app_iter).decode('utf8')
|
||||
resp_dict = {'Response Status': self._response_status,
|
||||
'Response Body': response_body}
|
||||
errors = []
|
||||
|
||||
if not is_success(self._get_status_int()):
|
||||
src_path = additional_resp_headers['X-Copied-From']
|
||||
errors.append((
|
||||
wsgi_quote(src_path),
|
||||
self._response_status,
|
||||
))
|
||||
|
||||
for k, v in additional_resp_headers.items():
|
||||
if not k.lower().startswith(('x-object-sysmeta-',
|
||||
'x-backend')):
|
||||
resp_dict[k] = v
|
||||
|
||||
for k, v in self._response_headers:
|
||||
if not k.lower().startswith(('x-object-sysmeta-',
|
||||
'x-backend')):
|
||||
resp_dict[k] = v
|
||||
yield get_heartbeat_response_body(out_content_type,
|
||||
resp_dict,
|
||||
errors, 'copy')
|
||||
close_if_possible(gt)
|
||||
|
||||
return resp_iter()
|
||||
|
||||
else:
|
||||
app_resp = self._app_call(req.environ)
|
||||
self._adjust_put_response(req, additional_resp_headers)
|
||||
start_response(self._response_status,
|
||||
self._response_headers,
|
||||
self._response_exc_info)
|
||||
return app_resp
|
||||
|
||||
def _adjust_put_response(self, req, additional_resp_headers):
|
||||
if is_success(self._get_status_int()):
|
||||
@@ -220,6 +284,7 @@ class ServerSideCopyMiddleware(object):
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.logger = get_logger(conf, log_route="copy")
|
||||
self.yield_frequency = int(conf.get('yield_frequency', 10))
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
req = Request(env)
|
||||
@@ -333,6 +398,15 @@ class ServerSideCopyMiddleware(object):
|
||||
'body', request=req,
|
||||
content_type='text/plain')(req.environ,
|
||||
start_response)
|
||||
# If heartbeat is enabled, set minimum_write_chunk_size directly
|
||||
# in the original client request before making subrequests
|
||||
if config_true_value(req.params.get('heartbeat')):
|
||||
wsgi_input = req.environ.get('wsgi.input')
|
||||
if hasattr(wsgi_input, 'environ'):
|
||||
wsgi_input.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
# Not sure if we also need to set it in
|
||||
# the current request's environ
|
||||
req.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
|
||||
# Form the path of source object to be fetched
|
||||
ver, acct, _rest = req.split_path(2, 3, True)
|
||||
@@ -347,7 +421,8 @@ class ServerSideCopyMiddleware(object):
|
||||
src_container_name, src_obj_name)
|
||||
|
||||
# GET the source object, bail out on error
|
||||
ssc_ctx = ServerSideCopyWebContext(self.app, self.logger)
|
||||
ssc_ctx = ServerSideCopyWebContext(self.app, self.logger,
|
||||
self.yield_frequency)
|
||||
source_resp = self._get_source_object(ssc_ctx, source_path, req)
|
||||
if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
|
||||
return source_resp(source_resp.environ, start_response)
|
||||
@@ -434,6 +509,17 @@ class ServerSideCopyMiddleware(object):
|
||||
source_resp, sink_req)
|
||||
|
||||
put_resp = ssc_ctx.send_put_req(sink_req, resp_headers, start_response)
|
||||
|
||||
# For heartbeat=on, we need to cleanup the resp iter
|
||||
if config_true_value(req.params.get('heartbeat')):
|
||||
def clean_iter(app_iter):
|
||||
try:
|
||||
for chunk in app_iter:
|
||||
yield chunk
|
||||
finally:
|
||||
close_if_possible(source_resp.app_iter)
|
||||
return clean_iter(put_resp)
|
||||
|
||||
close_if_possible(source_resp.app_iter)
|
||||
return put_resp
|
||||
|
||||
|
@@ -366,13 +366,12 @@ from swift.common.registry import register_swift_info
|
||||
from swift.common.request_helpers import SegmentedIterable, \
|
||||
get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \
|
||||
get_container_update_override_key, update_ignore_range_header, \
|
||||
get_param, get_valid_part_num
|
||||
get_param, get_valid_part_num, get_heartbeat_response_body
|
||||
from swift.common.constraints import check_utf8
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest, make_env, \
|
||||
make_pre_authed_request
|
||||
from swift.common.middleware.bulk import get_response_body, \
|
||||
ACCEPTABLE_FORMATS, Bulk
|
||||
from swift.common.middleware.bulk import ACCEPTABLE_FORMATS, Bulk
|
||||
from swift.obj import expirer
|
||||
from swift.proxy.controllers.base import get_container_info
|
||||
|
||||
@@ -1527,7 +1526,7 @@ class StaticLargeObject(object):
|
||||
start_response(err.status,
|
||||
[(h, v) for h, v in err.headers.items()
|
||||
if h.lower() != 'content-length'])
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, problem_segments, 'upload')
|
||||
return
|
||||
|
||||
@@ -1554,7 +1553,7 @@ class StaticLargeObject(object):
|
||||
err_body = err_body.decode('utf-8', errors='replace')
|
||||
resp_dict['Response Body'] = err_body or '\n'.join(
|
||||
RESPONSE_REASONS.get(err.status_int, ['']))
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, problem_segments,
|
||||
'upload')
|
||||
else:
|
||||
@@ -1600,7 +1599,7 @@ class StaticLargeObject(object):
|
||||
if isinstance(resp_body, bytes):
|
||||
resp_body = resp_body.decode('utf-8')
|
||||
resp_dict['Response Body'] = resp_body
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, [], 'upload')
|
||||
else:
|
||||
for chunk in resp(req.environ, start_response):
|
||||
|
@@ -22,6 +22,8 @@ from swob in here without creating circular imports.
|
||||
|
||||
import itertools
|
||||
import time
|
||||
import json
|
||||
from xml.sax.saxutils import escape # nosec B406
|
||||
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
|
||||
@@ -1005,3 +1007,47 @@ def append_log_info(environ, log_info):
|
||||
|
||||
def get_log_info(environ):
|
||||
return ','.join(environ.get('swift.log_info', []))
|
||||
|
||||
|
||||
def get_heartbeat_response_body(data_format, data_dict, error_list, root_tag):
|
||||
"""
|
||||
Returns a response body for heartbeat according to format.
|
||||
|
||||
Handles json and xml, otherwise will return text/plain.
|
||||
Note: xml response does not include xml declaration.
|
||||
|
||||
:params data_format: resulting format
|
||||
:params data_dict: generated data about results.
|
||||
:params error_list: list of quoted filenames that failed
|
||||
:params root_tag: the tag name to use for root elements when returning XML;
|
||||
e.g. 'extract' or 'delete'
|
||||
"""
|
||||
if data_format == 'application/json':
|
||||
data_dict['Errors'] = error_list
|
||||
return json.dumps(data_dict).encode('ascii')
|
||||
if data_format and data_format.endswith('/xml'):
|
||||
output = ['<', root_tag, '>\n']
|
||||
for key in sorted(data_dict):
|
||||
xml_key = key.replace(' ', '_').lower()
|
||||
output.extend([
|
||||
'<', xml_key, '>',
|
||||
escape(str(data_dict[key])),
|
||||
'</', xml_key, '>\n',
|
||||
])
|
||||
output.append('<errors>\n')
|
||||
for name, status in error_list:
|
||||
output.extend([
|
||||
'<object><name>', escape(name), '</name><status>',
|
||||
escape(status), '</status></object>\n',
|
||||
])
|
||||
output.extend(['</errors>\n</', root_tag, '>\n'])
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
||||
output = []
|
||||
for key in sorted(data_dict):
|
||||
output.append('%s: %s\n' % (key, data_dict[key]))
|
||||
output.append('Errors:\n')
|
||||
output.extend(
|
||||
'%s, %s\n' % (name, status)
|
||||
for name, status in error_list)
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
@@ -645,11 +645,11 @@ class TestUntar(unittest.TestCase):
|
||||
])
|
||||
|
||||
def test_get_response_body(self):
|
||||
txt_body = bulk.get_response_body(
|
||||
txt_body = bulk.get_heartbeat_response_body(
|
||||
'bad_formay', {'hey': 'there'}, [['json > xml', '202 Accepted']],
|
||||
"doesn't matter for text")
|
||||
self.assertIn(b'hey: there', txt_body)
|
||||
xml_body = bulk.get_response_body(
|
||||
xml_body = bulk.get_heartbeat_response_body(
|
||||
'text/xml', {'hey': 'there'}, [['json > xml', '202 Accepted']],
|
||||
'root_tag')
|
||||
self.assertIn(b'>', xml_body)
|
||||
|
@@ -17,12 +17,13 @@
|
||||
from unittest import mock
|
||||
import unittest
|
||||
import urllib.parse
|
||||
import eventlet
|
||||
|
||||
from swift.common import swob
|
||||
from swift.common.middleware import copy
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.swob import Request, HTTPException
|
||||
from swift.common.utils import closing_if_possible, md5
|
||||
from swift.common.utils import close_if_possible, closing_if_possible, md5
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import patch_policies, FakeRing
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
@@ -1426,3 +1427,141 @@ class TestServerSideCopyMiddlewareWithEC(unittest.TestCase):
|
||||
self.assertEqual(resp.body, range_not_satisfiable_body)
|
||||
self.assertEqual(resp.etag, body_etag)
|
||||
self.assertEqual(resp.headers['Accept-Ranges'], 'bytes')
|
||||
|
||||
|
||||
class TestServerSideCopyHeartbeat(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.app = FakeSwift()
|
||||
self.ssc = copy.filter_factory({'yield_frequency': '1'})(self.app)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def call_app(self, req, app=None):
|
||||
if app is None:
|
||||
app = self.app
|
||||
|
||||
self.authorized = []
|
||||
|
||||
def authorize(req):
|
||||
self.authorized.append(req)
|
||||
|
||||
if 'swift.authorize' not in req.environ:
|
||||
req.environ['swift.authorize'] = authorize
|
||||
|
||||
req.headers.setdefault("User-Agent", "Test User Agent")
|
||||
|
||||
status = [None]
|
||||
headers = [None]
|
||||
|
||||
def start_response(s, h, ei=None):
|
||||
status[0] = s
|
||||
headers[0] = h
|
||||
|
||||
body_iter = app(req.environ, start_response)
|
||||
body = b''
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
finally:
|
||||
close_if_possible(body_iter)
|
||||
|
||||
return status[0], headers[0], body
|
||||
|
||||
def test_copy_with_heartbeat_success(self):
|
||||
original_spawn = eventlet.spawn
|
||||
self.app.register('GET', '/v1/a/c/o?heartbeat=true', swob.HTTPOk,
|
||||
{'Content-Length': '10'}, b'X' * 10)
|
||||
self.app.register('PUT', '/v1/a/c/o2?heartbeat=true',
|
||||
swob.HTTPCreated, {})
|
||||
heartbeats = []
|
||||
|
||||
def mock_spawn(func, *args, **kwargs):
|
||||
def delayed_func(*a, **kw):
|
||||
eventlet.sleep(2.5)
|
||||
return func(*a, **kw)
|
||||
return original_spawn(delayed_func, *args, **kwargs)
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o2?heartbeat=true',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '0', 'X-Copy-From': 'c/o'})
|
||||
|
||||
with mock.patch('eventlet.spawn', mock_spawn):
|
||||
status = [None]
|
||||
headers_list = [None]
|
||||
|
||||
def start_response(s, h, ei=None):
|
||||
status[0] = s
|
||||
headers_list[0] = h
|
||||
body_iter = self.ssc(req.environ, start_response)
|
||||
self.assertEqual('202 Accepted', status[0])
|
||||
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
heartbeats.append(chunk)
|
||||
finally:
|
||||
close_if_possible(body_iter)
|
||||
self.assertTrue(len(heartbeats) >= 3,
|
||||
f"Expected 3 heartbeats, got {len(heartbeats)}")
|
||||
self.assertEqual(heartbeats[0], b' ')
|
||||
|
||||
for i in range(1, len(heartbeats) - 1):
|
||||
self.assertEqual(heartbeats[i], b' ')
|
||||
self.assertIn(b'201 Created', heartbeats[-1])
|
||||
self.assertEqual(req.environ.get('eventlet.minimum_write_chunk_size'),
|
||||
0)
|
||||
self.assertEqual(2, len(self.app.calls))
|
||||
self.assertEqual(('GET', '/v1/a/c/o?heartbeat=true'),
|
||||
self.app.calls[0])
|
||||
self.assertEqual(('PUT', '/v1/a/c/o2?heartbeat=true'),
|
||||
self.app.calls[1])
|
||||
|
||||
def test_copy_with_heartbeat_failure(self):
|
||||
original_spawn = eventlet.spawn
|
||||
self.app.register('GET', '/v1/a/c/o?heartbeat=true', swob.HTTPOk,
|
||||
{'Content-Length': '10'}, b'X' * 10)
|
||||
self.app.register('PUT', '/v1/a/c/o2?heartbeat=true',
|
||||
swob.HTTPServiceUnavailable, {})
|
||||
heartbeats = []
|
||||
|
||||
def mock_spawn(func, *args, **kwargs):
|
||||
|
||||
def delayed_func(*a, **kw):
|
||||
eventlet.sleep(2.5)
|
||||
return func(*a, **kw)
|
||||
return original_spawn(delayed_func, *args, **kwargs)
|
||||
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o2?heartbeat=true',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '0', 'X-Copy-From': 'c/o'})
|
||||
|
||||
with mock.patch('eventlet.spawn', mock_spawn):
|
||||
status = [None]
|
||||
headers_list = [None]
|
||||
|
||||
def start_response(s, h, ei=None):
|
||||
status[0] = s
|
||||
headers_list[0] = h
|
||||
body_iter = self.ssc(req.environ, start_response)
|
||||
self.assertEqual('202 Accepted', status[0])
|
||||
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
heartbeats.append(chunk)
|
||||
finally:
|
||||
close_if_possible(body_iter)
|
||||
self.assertTrue(len(heartbeats) >= 3,
|
||||
f"Expected 3 heartbeats, got {len(heartbeats)}")
|
||||
self.assertEqual(heartbeats[0], b' ')
|
||||
|
||||
for i in range(1, len(heartbeats) - 1):
|
||||
self.assertEqual(heartbeats[i], b' ')
|
||||
self.assertIn(b'503 Service Unavailable', heartbeats[-1])
|
||||
self.assertEqual(req.environ.get('eventlet.minimum_write_chunk_size'),
|
||||
0)
|
||||
self.assertEqual(2, len(self.app.calls))
|
||||
self.assertEqual(('GET', '/v1/a/c/o?heartbeat=true'),
|
||||
self.app.calls[0])
|
||||
self.assertEqual(('PUT', '/v1/a/c/o2?heartbeat=true'),
|
||||
self.app.calls[1])
|
||||
|
Reference in New Issue
Block a user