From fb08d477eb7bf5e678b9cd99b44a435842a7dfbf Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Thu, 1 Oct 2020 14:28:04 -0500 Subject: [PATCH] New proxy logging field for wire status Capture the on the wire status code for logging because we change the logged status code sometimes. Closes-Bug: #1896518 Change-Id: I27feabe923a6520e983637a9c68a19ec7174a0df --- doc/source/logs.rst | 3 + swift/common/middleware/proxy_logging.py | 78 ++++++++++--------- .../versioned_writes/object_versioning.py | 6 +- swift/common/utils.py | 26 ++++--- swift/proxy/controllers/obj.py | 9 ++- test/unit/common/middleware/helpers.py | 15 +++- .../common/middleware/test_proxy_logging.py | 40 +++++++++- test/unit/common/test_utils.py | 17 ++++ 8 files changed, 136 insertions(+), 58 deletions(-) diff --git a/doc/source/logs.rst b/doc/source/logs.rst index 8f53dfd82a..e70de1f8ed 100644 --- a/doc/source/logs.rst +++ b/doc/source/logs.rst @@ -93,6 +93,9 @@ container The container part extracted from the path of the request. object The object part extracted from the path of the request. (anonymizable) pid PID of the process emitting the log line. +wire_status_int The status sent to the client, which may be different than + the logged response code if there was an error during the + body of the request or a disconnect. =================== ========================================================== In one log line, all of the above fields are space-separated and url-encoded. diff --git a/swift/common/middleware/proxy_logging.py b/swift/common/middleware/proxy_logging.py index fd7b0d7ea4..f774afb203 100644 --- a/swift/common/middleware/proxy_logging.py +++ b/swift/common/middleware/proxy_logging.py @@ -74,9 +74,10 @@ bandwidth usage will want to only sum up logs with no swift.source. import os import time +from swift.common.middleware.catch_errors import enforce_byte_count from swift.common.swob import Request from swift.common.utils import (get_logger, get_remote_client, - config_true_value, + config_true_value, reiterate, InputProxy, list_from_csv, get_policy_index, split_path, StrAnonymizer, StrFormatTime, LogStringFormatter) @@ -176,7 +177,8 @@ class ProxyLoggingMiddleware(object): 'log_info': '', 'policy_index': '', 'ttfb': '0.05', - 'pid': '42' + 'pid': '42', + 'wire_status_int': '200', } try: self.log_formatter.format(self.log_msg_template, **replacements) @@ -198,7 +200,8 @@ class ProxyLoggingMiddleware(object): return value def log_request(self, req, status_int, bytes_received, bytes_sent, - start_time, end_time, resp_headers=None, ttfb=0): + start_time, end_time, resp_headers=None, ttfb=0, + wire_status_int=None): """ Log a request. @@ -209,6 +212,7 @@ class ProxyLoggingMiddleware(object): :param start_time: timestamp request started :param end_time: timestamp request completed :param resp_headers: dict of the response headers + :param wire_status_int: the on the wire status int """ resp_headers = resp_headers or {} logged_headers = None @@ -277,6 +281,7 @@ class ProxyLoggingMiddleware(object): 'policy_index': policy_index, 'ttfb': ttfb, 'pid': self.pid, + 'wire_status_int': wire_status_int or status_int, } self.access_logger.info( self.log_formatter.format(self.log_msg_template, @@ -352,47 +357,46 @@ class ProxyLoggingMiddleware(object): def my_start_response(status, headers, exc_info=None): start_response_args[0] = (status, list(headers), exc_info) - def status_int_for_logging(client_disconnect=False, start_status=None): + def status_int_for_logging(start_status, client_disconnect=False): # log disconnected clients as '499' status code if client_disconnect or input_proxy.client_disconnect: - ret_status_int = 499 - elif start_status is None: - ret_status_int = int( - start_response_args[0][0].split(' ', 1)[0]) - else: - ret_status_int = start_status - return ret_status_int + return 499 + return start_status def iter_response(iterable): - iterator = iter(iterable) - try: - chunk = next(iterator) - while not chunk: - chunk = next(iterator) - except StopIteration: - chunk = b'' + iterator = reiterate(iterable) + content_length = None for h, v in start_response_args[0][1]: - if h.lower() in ('content-length', 'transfer-encoding'): + if h.lower() == 'content-length': + content_length = int(v) + break + elif h.lower() == 'transfer-encoding': break else: - if not chunk: - start_response_args[0][1].append(('Content-Length', '0')) - elif isinstance(iterable, list): + if isinstance(iterator, list): + content_length = sum(len(i) for i in iterator) start_response_args[0][1].append( - ('Content-Length', str(sum(len(i) for i in iterable)))) + ('Content-Length', str(content_length))) + + req = Request(env) + method = self.method_from_req(req) + if method == 'HEAD': + content_length = 0 + if content_length is not None: + iterator = enforce_byte_count(iterator, content_length) + + wire_status_int = int(start_response_args[0][0].split(' ', 1)[0]) resp_headers = dict(start_response_args[0][1]) start_response(*start_response_args[0]) - req = Request(env) # Log timing information for time-to-first-byte (GET requests only) - method = self.method_from_req(req) ttfb = 0.0 if method == 'GET': - status_int = status_int_for_logging() policy_index = get_policy_index(req.headers, resp_headers) - metric_name = self.statsd_metric_name(req, status_int, method) + metric_name = self.statsd_metric_name( + req, wire_status_int, method) metric_name_policy = self.statsd_metric_name_policy( - req, status_int, method, policy_index) + req, wire_status_int, method, policy_index) ttfb = time.time() - start_time if metric_name: self.access_logger.timing( @@ -403,31 +407,33 @@ class ProxyLoggingMiddleware(object): bytes_sent = 0 client_disconnect = False + start_status = wire_status_int try: - while chunk: + for chunk in iterator: bytes_sent += len(chunk) yield chunk - chunk = next(iterator) except StopIteration: # iterator was depleted return except GeneratorExit: # generator was closed before we finished client_disconnect = True raise + except Exception: + start_status = 500 + raise finally: - status_int = status_int_for_logging(client_disconnect) + status_int = status_int_for_logging( + start_status, client_disconnect) self.log_request( req, status_int, input_proxy.bytes_received, bytes_sent, start_time, time.time(), resp_headers=resp_headers, - ttfb=ttfb) - close_method = getattr(iterable, 'close', None) - if callable(close_method): - close_method() + ttfb=ttfb, wire_status_int=wire_status_int) + iterator.close() try: iterable = self.app(env, my_start_response) except Exception: req = Request(env) - status_int = status_int_for_logging(start_status=500) + status_int = status_int_for_logging(500) self.log_request( req, status_int, input_proxy.bytes_received, 0, start_time, time.time()) diff --git a/swift/common/middleware/versioned_writes/object_versioning.py b/swift/common/middleware/versioned_writes/object_versioning.py index 508972f72e..26dcb5c546 100644 --- a/swift/common/middleware/versioned_writes/object_versioning.py +++ b/swift/common/middleware/versioned_writes/object_versioning.py @@ -329,16 +329,17 @@ class ObjectContext(ObjectVersioningContext): # do the write put_resp = put_req.get_response(self.app) - drain_and_close(put_resp) close_if_possible(put_req.environ['wsgi.input']) if put_resp.status_int == HTTP_NOT_FOUND: + drain_and_close(put_resp) raise HTTPInternalServerError( request=req, content_type='text/plain', body=b'The versions container does not exist. You may ' b'want to re-enable object versioning.') self._check_response_error(req, put_resp) + drain_and_close(put_resp) put_bytes = byte_counter.bytes_read # N.B. this is essentially the same hack that symlink does in # _validate_etag_and_update_sysmeta to deal with SLO @@ -392,12 +393,13 @@ class ObjectContext(ObjectVersioningContext): """ if is_success(resp.status_int): return + body = resp.body drain_and_close(resp) if is_client_error(resp.status_int): # missing container or bad permissions if resp.status_int == 404: raise HTTPPreconditionFailed(request=req) - raise HTTPException(body=resp.body, status=resp.status, + raise HTTPException(body=body, status=resp.status, headers=resp.headers) # could not version the data, bail raise HTTPServiceUnavailable(request=req) diff --git a/swift/common/utils.py b/swift/common/utils.py index 83417ff9ea..6f7c64b422 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3981,23 +3981,26 @@ class CloseableChain(object): """ def __init__(self, *iterables): self.iterables = iterables + self.chained_iter = itertools.chain(*self.iterables) def __iter__(self): - return iter(itertools.chain(*(self.iterables))) + return self + + def __next__(self): + return next(self.chained_iter) + + next = __next__ # py2 def close(self): for it in self.iterables: - close_method = getattr(it, 'close', None) - if close_method: - close_method() + close_if_possible(it) def reiterate(iterable): """ - Consume the first item from an iterator, then re-chain it to the rest of - the iterator. This is useful when you want to make sure the prologue to - downstream generators have been executed before continuing. - + Consume the first truthy item from an iterator, then re-chain it to the + rest of the iterator. This is useful when you want to make sure the + prologue to downstream generators have been executed before continuing. :param iterable: an iterable object """ if isinstance(iterable, (list, tuple)): @@ -4005,12 +4008,13 @@ def reiterate(iterable): else: iterator = iter(iterable) try: - chunk = '' + chunk = next(iterator) while not chunk: chunk = next(iterator) return CloseableChain([chunk], iterator) except StopIteration: - return [] + close_if_possible(iterable) + return iter([]) class InputProxy(object): @@ -4311,6 +4315,8 @@ def drain_and_close(response_or_app_iter): don't care about the body of an error. """ app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter) + if app_iter is None: # for example, if we used the Response.body property + return for _chunk in app_iter: pass close_if_possible(app_iter) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index cd35973634..67b9a8b760 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -1063,7 +1063,7 @@ class ECAppIter(object): # cleanup the frag queue feeding coros that may be currently # executing the internal_parts_iters. if self.stashed_iter: - self.stashed_iter.close() + close_if_possible(self.stashed_iter) sleep() # Give the per-frag threads a chance to clean up for it in self.internal_parts_iters: close_if_possible(it) @@ -1200,10 +1200,15 @@ class ECAppIter(object): def __iter__(self): if self.stashed_iter is not None: - return iter(self.stashed_iter) + return self else: raise ValueError("Failed to call kickoff() before __iter__()") + def __next__(self): + return next(self.stashed_iter) + + next = __next__ # py2 + def _real_iter(self, req, resp_headers): if not self.range_specs: client_asked_for_range = False diff --git a/test/unit/common/middleware/helpers.py b/test/unit/common/middleware/helpers.py index f299aa67c4..1eb43239e9 100644 --- a/test/unit/common/middleware/helpers.py +++ b/test/unit/common/middleware/helpers.py @@ -32,15 +32,22 @@ class LeakTrackingIter(object): def __init__(self, inner_iter, mark_closed, mark_read, key): if isinstance(inner_iter, bytes): inner_iter = (inner_iter, ) - self.inner_iter = inner_iter + self.inner_iter = iter(inner_iter) self.mark_closed = mark_closed self.mark_read = mark_read self.key = key def __iter__(self): - for x in self.inner_iter: - yield x - self.mark_read(self.key) + return self + + def __next__(self): + try: + return next(self.inner_iter) + except StopIteration: + self.mark_read(self.key) + raise + + next = __next__ # for py2 def close(self): self.mark_closed(self.key) diff --git a/test/unit/common/middleware/test_proxy_logging.py b/test/unit/common/middleware/test_proxy_logging.py index 494eec92a2..6e7b23f504 100644 --- a/test/unit/common/middleware/test_proxy_logging.py +++ b/test/unit/common/middleware/test_proxy_logging.py @@ -51,8 +51,10 @@ class FakeApp(object): except ValueError: is_container_or_object_req = False - headers = [('Content-Type', 'text/plain'), - ('Content-Length', str(sum(map(len, self.body))))] + headers = [('Content-Type', 'text/plain')] + if not hasattr(self.body, 'close'): + content_length = sum(map(len, self.body)) + headers.append(('Content-Length', str(content_length))) if is_container_or_object_req and self.policy_idx is not None: headers.append(('X-Backend-Storage-Policy-Index', str(self.policy_idx))) @@ -612,13 +614,22 @@ class TestProxyLogging(unittest.TestCase): class CloseableBody(object): def __init__(self): + self.msg = b"CloseableBody" self.closed = False def close(self): self.closed = True def __iter__(self): - return iter(["CloseableBody"]) + return self + + def __next__(self): + if not self.msg: + raise StopIteration + result, self.msg = self.msg, b'' + return result + + next = __next__ # py2 body = CloseableBody() app = proxy_logging.ProxyLoggingMiddleware(FakeApp(body), {}) @@ -682,6 +693,27 @@ class TestProxyLogging(unittest.TestCase): self.assertEqual(log_parts[6], '499') self.assertEqual(log_parts[11], '4') # write length + def test_exploding_body(self): + + def exploding_body(): + yield 'some' + yield 'stuff' + raise Exception('kaboom!') + + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(exploding_body()), { + 'log_msg_template': '{method} {path} ' + '{status_int} {wire_status_int}', + }) + app.access_logger = FakeLogger() + req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(app) + with self.assertRaises(Exception) as ctx: + resp.body + self.assertEqual('kaboom!', str(ctx.exception)) + log_parts = self._log_parts(app) + self.assertEqual(log_parts, ['GET', '/', '500', '200']) + def test_disconnect_on_readline(self): app = proxy_logging.ProxyLoggingMiddleware(FakeAppReadline(), {}) app.access_logger = FakeLogger() @@ -748,7 +780,7 @@ class TestProxyLogging(unittest.TestCase): app = proxy_logging.ProxyLoggingMiddleware( FakeAppNoContentLengthNoTransferEncoding( # test the "while not chunk: chunk = next(iterator)" - body=['', '', ''], + body=[b'', b'', b''], ), {}) app.access_logger = FakeLogger() req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'}) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index de4aef66fe..ef9913b936 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1247,6 +1247,23 @@ class TestUtils(unittest.TestCase): else: os.environ.pop('TZ') + def test_drain_and_close(self): + utils.drain_and_close([]) + utils.drain_and_close(iter([])) + drained = [False] + + def gen(): + yield 'x' + yield 'y' + drained[0] = True + + utils.drain_and_close(gen()) + self.assertTrue(drained[0]) + utils.drain_and_close(Response(status=200, body=b'Some body')) + drained = [False] + utils.drain_and_close(Response(status=200, app_iter=gen())) + self.assertTrue(drained[0]) + def test_backwards(self): # Test swift.common.utils.backward