Merge "Adds labeled cleaving metrics to the sharder"
This commit is contained in:
@@ -35,6 +35,7 @@ from swift.common.direct_client import (direct_put_container,
|
||||
from swift.common.daemon import run_daemon
|
||||
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.statsd_client import get_labeled_statsd_client
|
||||
from swift.common.swob import str_to_wsgi
|
||||
from swift.common.utils import get_logger, config_true_value, \
|
||||
dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \
|
||||
@@ -850,11 +851,12 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
"""Shards containers."""
|
||||
log_route = 'container-sharder'
|
||||
|
||||
def __init__(self, conf, logger=None):
|
||||
def __init__(self, conf, logger=None, statsd=None):
|
||||
logger = logger or get_logger(conf, log_route=self.log_route)
|
||||
ContainerReplicator.__init__(self, conf, logger=logger)
|
||||
ContainerSharderConf.__init__(self, conf)
|
||||
ContainerSharderConf.validate_conf(self)
|
||||
self.statsd = statsd or get_labeled_statsd_client(conf, self.logger)
|
||||
self.shards_account_prefix = (AUTO_CREATE_ACCOUNT_PREFIX + 'shards_')
|
||||
self.sharding_candidates = []
|
||||
self.shrinking_candidates = []
|
||||
@@ -2193,6 +2195,17 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
self.db_logger.warning(broker, 'Failed to get own_shard_range')
|
||||
ranges_todo = [] # skip cleaving
|
||||
|
||||
labels = {
|
||||
'account': broker.account,
|
||||
'container': broker.container
|
||||
}
|
||||
if cleaving_context.ranges_done == 0 and own_shard_range:
|
||||
self.statsd.timing_since(
|
||||
'swift_container_sharder_time_to_first_cleave',
|
||||
float(own_shard_range.epoch), labels={
|
||||
**labels,
|
||||
'ranges_todo': cleaving_context.ranges_todo})
|
||||
|
||||
ranges_done = []
|
||||
for shard_range in ranges_todo:
|
||||
if cleaving_context.cleaving_done:
|
||||
@@ -2234,6 +2247,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
cleaving_context.store(broker)
|
||||
self.db_logger.debug(
|
||||
broker, 'Cleaved %s shard ranges', len(ranges_done))
|
||||
|
||||
if cleaving_context.cleaving_done:
|
||||
self.statsd.timing_since(
|
||||
'swift_container_sharder_time_to_last_cleave',
|
||||
float(own_shard_range.epoch), labels={
|
||||
**labels,
|
||||
'ranges_done': cleaving_context.ranges_done})
|
||||
|
||||
return (cleaving_context.misplaced_done and
|
||||
cleaving_context.cleaving_done)
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5, \
|
||||
ShardName, Namespace
|
||||
|
||||
from test.debug_logger import debug_logger
|
||||
from test.debug_logger import debug_logger, debug_labeled_statsd_client
|
||||
from test.unit import FakeRing, make_timestamp_iter, unlink_files, \
|
||||
mocked_http_conn, mock_timestamp_now, mock_timestamp_now_with_iter, \
|
||||
attach_fake_replication_rpc
|
||||
@@ -58,6 +58,13 @@ class BaseTestSharder(unittest.TestCase):
|
||||
self.tempdir = mkdtemp()
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
self.logger = debug_logger('sharder-test')
|
||||
conf = {
|
||||
'log_statsd_host': 'host',
|
||||
'log_statsd_port': 8125,
|
||||
'statsd_label_mode': 'dogstatsd',
|
||||
'statsd_emit_legacy': True,
|
||||
}
|
||||
self.statsd = debug_labeled_statsd_client(conf)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir, ignore_errors=True)
|
||||
@@ -150,6 +157,8 @@ class TestSharder(BaseTestSharder):
|
||||
logger = self.logger if use_logger else None
|
||||
if logger:
|
||||
logger.clear()
|
||||
self.statsd.clear()
|
||||
|
||||
with mock.patch(
|
||||
'swift.container.sharder.internal_client.InternalClient') \
|
||||
as mock_ic:
|
||||
@@ -468,6 +477,16 @@ class TestSharder(BaseTestSharder):
|
||||
stats = recon['sharding_stats']['sharding'].get(category)
|
||||
self.assertEqual(expected, stats)
|
||||
|
||||
def assert_labeled_timing_since_stats(self, exp_metrics_values_labels):
|
||||
statsd_calls = self.statsd.calls['timing_since']
|
||||
exp_calls = []
|
||||
for metric, value, labels in exp_metrics_values_labels:
|
||||
exp_calls.append(((metric, mock.ANY), {'labels': labels}))
|
||||
self.assertEqual(exp_calls, statsd_calls)
|
||||
for i, (metric, value, labels) in enumerate(exp_metrics_values_labels):
|
||||
self.assertAlmostEqual(
|
||||
value, statsd_calls[i][0][1], places=4, msg=i)
|
||||
|
||||
def test_increment_stats(self):
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._increment_stat('visited', 'success')
|
||||
@@ -1074,6 +1093,7 @@ class TestSharder(BaseTestSharder):
|
||||
@contextmanager
|
||||
def _mock_sharder(self, conf=None, replicas=3):
|
||||
self.logger.clear()
|
||||
self.statsd.clear()
|
||||
conf = conf or {}
|
||||
conf['devices'] = self.tempdir
|
||||
fake_ring = FakeRing(replicas=replicas, separate_replication=True)
|
||||
@@ -1082,7 +1102,8 @@ class TestSharder(BaseTestSharder):
|
||||
with mock.patch(
|
||||
'swift.common.db_replicator.ring.Ring',
|
||||
return_value=fake_ring):
|
||||
sharder = ContainerSharder(conf, logger=self.logger)
|
||||
sharder = ContainerSharder(conf, logger=self.logger,
|
||||
statsd=self.statsd)
|
||||
sharder._local_device_ids = {dev['id']: dev
|
||||
for dev in fake_ring.devs}
|
||||
sharder._replicate_object = mock.MagicMock(
|
||||
@@ -2561,7 +2582,7 @@ class TestSharder(BaseTestSharder):
|
||||
% broker.db_file, lines[0])
|
||||
self.assertIn(
|
||||
'Completed cleaving, DB set to sharded state, path: a/c, db: %s'
|
||||
% broker.db_file, lines[1:])
|
||||
% broker.db_file, lines)
|
||||
|
||||
self.assertTrue(self.logger.statsd_client.calls['timing_since'])
|
||||
self.assertEqual(
|
||||
@@ -2585,6 +2606,13 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertGreater(
|
||||
self.logger.statsd_client.calls['timing_since'][-1][0][1], 0)
|
||||
|
||||
self.assert_labeled_timing_since_stats([
|
||||
('swift_container_sharder_time_to_first_cleave', mock.ANY,
|
||||
{'account': 'a', 'container': 'c', 'ranges_todo': 2}),
|
||||
('swift_container_sharder_time_to_last_cleave', mock.ANY,
|
||||
{'account': 'a', 'container': 'c', 'ranges_done': 2}),
|
||||
])
|
||||
|
||||
# check shard ranges were updated to ACTIVE
|
||||
self.assertEqual([ShardRange.ACTIVE] * 2,
|
||||
[sr.state for sr in broker.get_shard_ranges()])
|
||||
|
||||
Reference in New Issue
Block a user