Object-server: add labeled timing metrics for object REPLICATE request

Co-Authored-By: Shreeya Deshpande <shreeyad@nvidia.com>
Change-Id: I155572bc5f4d955c918c951d6d2af9dbbae0136d
This commit is contained in:
Yan Xiao
2025-03-12 15:59:35 -04:00
parent 076771462c
commit ab6e05922f
4 changed files with 301 additions and 10 deletions

View File

@@ -986,6 +986,34 @@ class GreenthreadSafeIterator(object):
return next(self.unsafe_iter)
def labeled_timing_stats(metric, **dec_kwargs):
"""
Returns a decorator that emits labeled metrics timing events or errors
for public methods in swift's wsgi server controllers, based on response
code.
The controller methods are not allowed to override the following labels:
'method', 'status'.
"""
def decorating_func(func):
@functools.wraps(func)
def _timing_stats(ctrl, req, *args, **kwargs):
labels = {}
start_time = time.time()
req_method = req.method
resp = func(ctrl, req, *args, timing_stats_labels=labels, **kwargs)
labels['method'] = req_method
labels['status'] = resp.status_int
ctrl.statsd.timing_since(metric, start_time, labels=labels,
**dec_kwargs)
return resp
return _timing_stats
return decorating_func
def timing_stats(**dec_kwargs):
"""
Returns a decorator that logs timing events or errors for public methods in

View File

@@ -29,9 +29,9 @@ from eventlet import sleep, wsgi, Timeout, tpool
from eventlet.greenthread import spawn
from swift.common.utils import public, get_logger, \
config_true_value, config_percent_value, timing_stats, replication, \
normalize_delete_at_timestamp, get_log_line, Timestamp, \
parse_mime_headers, \
config_true_value, config_percent_value, timing_stats, \
labeled_timing_stats, replication, normalize_delete_at_timestamp, \
get_log_line, Timestamp, parse_mime_headers, \
iter_multipart_mime_documents, extract_swift_bytes, safe_json_loads, \
config_auto_int_value, split_path, get_redirect_data, \
normalize_timestamp, md5, parse_options, CooperativeIterator
@@ -52,6 +52,7 @@ from swift.common.request_helpers import get_name_and_placement, \
is_user_meta, is_sys_or_user_meta, is_object_transient_sysmeta, \
resolve_etag_is_at_header, is_sys_meta, validate_internal_obj, \
is_backend_open_expired
from swift.common.statsd_client import get_labeled_statsd_client
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
@@ -64,6 +65,9 @@ from swift.obj.expirer import build_task_obj, embed_expirer_bytes_in_ctype, \
X_DELETE_TYPE
LABELED_METRIC_NAME = 'swift_object_server_request_timing'
def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size):
mime_documents_iter = iter_multipart_mime_documents(
wsgi_input, mime_boundary, read_chunk_size)
@@ -141,6 +145,7 @@ class ObjectController(BaseStorageServer):
"""
super(ObjectController, self).__init__(conf)
self.logger = logger or get_logger(conf, log_route='object-server')
self.statsd = get_labeled_statsd_client(conf, self.logger.logger)
self.node_timeout = float(conf.get('node_timeout', 3))
self.container_update_timeout = float(
conf.get('container_update_timeout', 1))
@@ -1313,7 +1318,8 @@ class ObjectController(BaseStorageServer):
@public
@replication
@timing_stats(sample_rate=0.1)
def REPLICATE(self, request):
@labeled_timing_stats(metric=LABELED_METRIC_NAME, sample_rate=0.1)
def REPLICATE(self, request, timing_stats_labels):
"""
Handle REPLICATE requests for the Swift Object Server. This is used
by the object replicator to get hashes for directories.
@@ -1325,10 +1331,14 @@ class ObjectController(BaseStorageServer):
device, partition, suffix_parts, policy = \
get_name_and_placement(request, 2, 3, True)
suffixes = suffix_parts.split('-') if suffix_parts else []
skip_rehash = bool(suffixes)
timing_stats_labels['policy'] = int(policy)
timing_stats_labels['skip_rehash'] = skip_rehash
try:
hashes = self._diskfile_router[policy].get_hashes(
device, partition, suffixes, policy,
skip_rehash=bool(suffixes))
skip_rehash=skip_rehash)
except DiskFileDeviceUnavailable:
resp = HTTPInsufficientStorage(drive=device, request=request)
else:

View File

@@ -21,7 +21,9 @@ import io
import itertools
from swift.common.statsd_client import StatsdClient
from test.debug_logger import debug_logger, FakeStatsdClient
from swift.common.swob import Request
from test.debug_logger import debug_logger, FakeStatsdClient, \
debug_labeled_statsd_client
from test.unit import temptree, make_timestamp_iter, with_tempdir, \
mock_timestamp_now, FakeIterable
@@ -7981,6 +7983,146 @@ class TestLoggerStatsdClientDelegation(unittest.TestCase):
msgs)
class MockLabeledTimingController(object):
def __init__(self, status, extra_labels=None):
self.statsd = debug_labeled_statsd_client({})
self.status = status
self.extra_labels = extra_labels or {}
def _update_labels(self, req, labels):
labels.update(self.extra_labels)
@utils.labeled_timing_stats(metric='my_timing_metric')
def handle_req(self, req, timing_stats_labels):
self._update_labels(req, timing_stats_labels)
return Response(status=self.status)
class TestLabeledTimingStatsDecorator(unittest.TestCase):
@contextlib.contextmanager
def _patch_time(self):
now = time.time()
with mock.patch('swift.common.utils.time.time', return_value=now):
yield now
def test_labeled_timing_stats_get_200(self):
req = Request.blank('/v1/a/c/o')
mock_controller = MockLabeledTimingController(200)
with self._patch_time() as now:
mock_controller.handle_req(req)
self.assertEqual(
{'timing_since': [(('my_timing_metric', now), {
'labels': {
'method': 'GET',
'status': 200,
}
})]},
mock_controller.statsd.calls)
def test_labeled_timing_stats_head_500(self):
req = Request.blank('/v1/a/c/o', method='HEAD')
mock_controller = MockLabeledTimingController(500)
with self._patch_time() as now:
mock_controller.handle_req(req)
self.assertEqual(
{'timing_since': [(('my_timing_metric', now), {
'labels': {
'method': 'HEAD',
'status': 500,
}
})]},
mock_controller.statsd.calls)
def test_labeled_timing_stats_extra_labels(self):
req = Request.blank('/v1/AUTH_test/c/o')
mock_controller = MockLabeledTimingController(
206, extra_labels={'account': 'AUTH_test'})
with self._patch_time() as now:
mock_controller.handle_req(req)
self.assertEqual(
{'timing_since': [(('my_timing_metric', now), {
'labels': {
'account': 'AUTH_test',
'method': 'GET',
'status': 206,
}
})]},
mock_controller.statsd.calls)
def test_labeled_timing_stats_can_not_override_status(self):
req = Request.blank('/v1/AUTH_test/c/o')
mock_controller = MockLabeledTimingController(
404, extra_labels={'status': 200})
with self._patch_time() as now:
mock_controller.handle_req(req)
self.assertEqual(
{'timing_since': [(('my_timing_metric', now), {
'labels': {
'method': 'GET',
'status': 404,
}
})]},
mock_controller.statsd.calls)
def test_labeled_timing_stats_can_not_override_method(self):
req = Request.blank('/v1/AUTH_test/c/o', method='POST')
mock_controller = MockLabeledTimingController(
412, extra_labels={'method': 'GET'})
with self._patch_time() as now:
mock_controller.handle_req(req)
self.assertEqual(
{'timing_since': [(('my_timing_metric', now), {
'labels': {
'method': 'POST',
'status': 412,
}
})]},
mock_controller.statsd.calls)
def test_labeled_timing_stats_really_can_not_override_method(self):
class MutilatingController(MockLabeledTimingController):
def _update_labels(self, req, labels):
req.method = 'BANANA'
req = Request.blank('/v1/AUTH_test/c/o', method='POST')
mock_controller = MutilatingController(412)
with self._patch_time() as now:
mock_controller.handle_req(req)
self.assertEqual('BANANA', req.method)
self.assertEqual(
{'timing_since': [(('my_timing_metric', now), {
'labels': {
'method': 'POST',
'status': 412,
}
})]},
mock_controller.statsd.calls)
def test_labeled_timing_stats_cannot_remove_labels(self):
class MutilatingController(MockLabeledTimingController):
def _update_labels(self, req, labels):
labels.clear()
req = Request.blank('/v1/AUTH_test/c/o', method='DELETE')
mock_controller = MutilatingController('42 bad stuff')
with self._patch_time() as now:
mock_controller.handle_req(req)
self.assertEqual(
{'timing_since': [(('my_timing_metric', now), {
'labels': {
'method': 'DELETE',
# resp.status_int knows how to do it
'status': 42,
}
})]},
mock_controller.statsd.calls)
class TestTimingStatsDecorators(unittest.TestCase):
def test_timing_stats(self):
class MockController(object):

View File

@@ -41,7 +41,8 @@ from swift import __version__ as swift_version
from swift.common.http import is_success
from swift.obj.expirer import ExpirerConfig
from test import listen_zero, BaseTestCase
from test.debug_logger import debug_logger
from test.debug_logger import debug_logger, FakeStatsdClient, \
FakeLabeledStatsdClient
from test.unit import mocked_http_conn, \
make_timestamp_iter, DEFAULT_TEST_EC_TYPE, skip_if_no_xattrs, \
connect_tcp, readuntil2crlfs, patch_policies, encode_frag_archive_bodies, \
@@ -55,6 +56,7 @@ from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
Timestamp, md5
from swift.common import constraints
from swift.common.request_helpers import get_reserved_name
from swift.common.statsd_client import LabeledStatsdClient
from swift.common.swob import Request, WsgiBytesIO, \
HTTPRequestedRangeNotSatisfiable
from swift.common.splice import splice
@@ -191,10 +193,15 @@ class TestObjectController(BaseTestCase):
'mount_check': 'false',
'container_update_timeout': 0.0,
}
self.logger.clear()
app = object_server.ObjectController(conf, logger=self.logger)
self.assertEqual(app.container_update_timeout, 0.0)
self.assertEqual(app.auto_create_account_prefix, '.')
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
self.assertIsInstance(app.statsd, LabeledStatsdClient)
self.assertEqual({
'debug': [
'Labeled statsd mode: disabled (test-object-controller)'
]}, self.logger.all_log_lines())
def check_all_api_methods(self, obj_name='o', alt_res=None):
path = '/sda1/p/a/c/%s' % obj_name
@@ -221,6 +228,110 @@ class TestObjectController(BaseTestCase):
if out_body and (200 <= res < 300):
self.assertEqual(resp.body, out_body)
def _do_test_timing_stats(self, conf, req, now):
with mock.patch('swift.common.utils.time.time', return_value=now), \
mock.patch('swift.common.statsd_client.StatsdClient',
FakeStatsdClient), \
mock.patch('swift.common.statsd_client.LabeledStatsdClient',
FakeLabeledStatsdClient):
app = object_server.ObjectController(conf, logger=self.logger)
statsd_client = app.logger.logger.statsd_client
statsd = app.statsd
with mock.patch.object(statsd_client, 'random', return_value=0), \
mock.patch.object(statsd, 'random', return_value=0):
req.call_application(app)
self.assertIsInstance(statsd_client, FakeStatsdClient)
self.assertIsInstance(statsd, FakeLabeledStatsdClient)
return statsd_client, statsd
def test_legacy_and_labeled_timing_stats_replicate(self):
req = Request.blank(
'/sda1/p/', environ={'REQUEST_METHOD': 'REPLICATE'})
now = time()
statsd_client, statsd = self._do_test_timing_stats(self.conf, req, now)
self.assertEqual({'timing_since': [(('REPLICATE.timing', now), {
'sample_rate': 0.1
})]}, statsd_client.calls)
self.assertEqual(
{'timing_since': [(('swift_object_server_request_timing', now), {
'labels': {
'method': 'REPLICATE',
'policy': 0,
'skip_rehash': False,
'status': 200
},
'sample_rate': 0.1
})]},
statsd.calls)
def test_legacy_and_labeled_timing_stats_replicate_skip_rehash(self):
# suffixes in request path
req = Request.blank(
'/sda1/p/123-abc', environ={'REQUEST_METHOD': 'REPLICATE'})
now = time()
statsd_client, statsd = self._do_test_timing_stats(self.conf, req, now)
self.assertEqual({'timing_since': [(('REPLICATE.timing', now), {
'sample_rate': 0.1
})]}, statsd_client.calls)
self.assertEqual(
{'timing_since': [(('swift_object_server_request_timing', now), {
'labels': {
'method': 'REPLICATE',
'policy': 0,
'skip_rehash': True,
'status': 200
},
'sample_rate': 0.1
})]},
statsd.calls)
def test_legacy_and_labeled_timing_stats_replicate_policy(self):
# non-default policy
req = Request.blank(
'/sda1/p/', environ={'REQUEST_METHOD': 'REPLICATE'},
headers={'X-Backend-Storage-Policy-Index': '1'}
)
now = time()
statsd_client, statsd = self._do_test_timing_stats(self.conf, req, now)
self.assertEqual({'timing_since': [(('REPLICATE.timing', now), {
'sample_rate': 0.1
})]}, statsd_client.calls)
self.assertEqual(
{'timing_since': [(('swift_object_server_request_timing', now), {
'labels': {
'method': 'REPLICATE',
'policy': 1,
'skip_rehash': False,
'status': 200
},
'sample_rate': 0.1
})]},
statsd.calls)
def test_legacy_and_labeled_timing_stats_replicate_507(self):
req = Request.blank(
'/sda1/p/', environ={'REQUEST_METHOD': 'REPLICATE'})
now = time()
# mount_check will provoke a 507
conf = dict(self.conf, mount_check='true')
statsd_client, statsd = self._do_test_timing_stats(conf, req, now)
self.assertEqual({'timing_since': [(('REPLICATE.errors.timing', now), {
'sample_rate': 0.1
})]}, statsd_client.calls)
self.assertEqual(
{'timing_since': [(('swift_object_server_request_timing', now), {
'labels': {
'method': 'REPLICATE',
'policy': 0,
'skip_rehash': False,
'status': 507
},
'sample_rate': 0.1
})]},
statsd.calls)
def test_REQUEST_SPECIAL_CHARS(self):
obj = 'special昆%20/%'
# The path argument of Request.blank() is a WSGI string, somehow
@@ -6921,7 +7032,7 @@ class TestObjectController(BaseTestCase):
given_args.extend(args)
self.object_controller.async_update = fake_async_update
self.object_controller.logger = self.logger
self.logger.clear()
delete_at = time()
req_headers = {
'X-Timestamp': 1,
@@ -6975,7 +7086,7 @@ class TestObjectController(BaseTestCase):
given_args.extend(args)
self.object_controller.async_update = fake_async_update
self.object_controller.logger = self.logger
self.logger.clear()
delete_at = time()
req_headers = {
'X-Timestamp': 1,