From 541fac775e2b245881f6b6302f23ecc15a8fd01e Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 21 Jul 2016 12:21:31 -0400 Subject: [PATCH] Records metrics per cluster --- cassandra/metrics.py | 15 +++++++++- tests/integration/standard/test_metrics.py | 35 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/cassandra/metrics.py b/cassandra/metrics.py index cf1f25c1..61502717 100644 --- a/cassandra/metrics.py +++ b/cassandra/metrics.py @@ -114,7 +114,9 @@ class Metrics(object): def __init__(self, cluster_proxy): log.debug("Starting metric capture") - self.stats = scales.collection('/cassandra', + # TODO, modify the path to /cassandra/{clusterid} in 4.0 + self.stats_id = str(id(cluster_proxy)) + self.stats = scales.collection('/_cassandra/{0}'.format(self.stats_id), scales.PmfStat('request_timer'), scales.IntStat('connection_errors'), scales.IntStat('write_timeouts'), @@ -132,6 +134,11 @@ class Metrics(object): scales.Stat('open_connections', lambda: sum(sum(p.open_count for p in s._pools.values()) for s in cluster_proxy.sessions))) + # TODO, to be removed in 4.0 + # /cassandra contains the metrics of the first cluster registered + if 'cassandra' not in scales._Stats.stats: + scales._Stats.stats['cassandra'] = scales._Stats.stats['_cassandra'][self.stats_id] + self.request_timer = self.stats.request_timer self.connection_errors = self.stats.connection_errors self.write_timeouts = self.stats.write_timeouts @@ -164,3 +171,9 @@ class Metrics(object): def on_retry(self): self.stats.retries += 1 + + def get_stats(self): + """ + Returns the metrics for the registered cluster instance. + """ + return scales.getStats()['_cassandra'][self.stats_id] diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index aba83de4..d33bb0e2 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -173,3 +173,38 @@ class MetricsTests(unittest.TestCase): # def test_retry(self): # # TODO: Look for ways to generate retries # pass + + def test_metrics_per_cluster(self): + """ + Test that metrics are per cluster. + """ + + cluster2 = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION, + default_retry_policy=FallthroughRetryPolicy()) + session2 = cluster2.connect("test3rf", wait_for_all_pools=True) + + query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL) + self.session.execute(query) + + # Pause node so it shows as unreachable to coordinator + get_node(1).pause() + + try: + # Test write + query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL) + with self.assertRaises(WriteTimeout): + self.session.execute(query, timeout=None) + finally: + get_node(1).resume() + + stats_cluster1 = self.cluster.metrics.get_stats() + stats_cluster2 = cluster2.metrics.get_stats() + + self.assertEqual(1, self.cluster.metrics.stats.write_timeouts) + self.assertEqual(0, cluster2.metrics.stats.write_timeouts) + + self.assertNotEqual(0.0, self.cluster.metrics.request_timer['mean']) + self.assertEqual(0.0, cluster2.metrics.request_timer['mean']) + + self.assertNotEqual(0.0, stats_cluster1['request_timer']['mean']) + self.assertEqual(0.0, stats_cluster2['request_timer']['mean'])