Merge pull request #631 from datastax/py561
PYTHON-561: Records metrics per cluster
This commit is contained in:
@@ -114,7 +114,9 @@ class Metrics(object):
|
|||||||
def __init__(self, cluster_proxy):
|
def __init__(self, cluster_proxy):
|
||||||
log.debug("Starting metric capture")
|
log.debug("Starting metric capture")
|
||||||
|
|
||||||
self.stats = scales.collection('/cassandra',
|
# TODO, modify the path to /cassandra/{clusterid} in 4.0
|
||||||
|
self.stats_id = str(id(cluster_proxy))
|
||||||
|
self.stats = scales.collection('/_cassandra/{0}'.format(self.stats_id),
|
||||||
scales.PmfStat('request_timer'),
|
scales.PmfStat('request_timer'),
|
||||||
scales.IntStat('connection_errors'),
|
scales.IntStat('connection_errors'),
|
||||||
scales.IntStat('write_timeouts'),
|
scales.IntStat('write_timeouts'),
|
||||||
@@ -132,6 +134,11 @@ class Metrics(object):
|
|||||||
scales.Stat('open_connections',
|
scales.Stat('open_connections',
|
||||||
lambda: sum(sum(p.open_count for p in s._pools.values()) for s in cluster_proxy.sessions)))
|
lambda: sum(sum(p.open_count for p in s._pools.values()) for s in cluster_proxy.sessions)))
|
||||||
|
|
||||||
|
# TODO, to be removed in 4.0
|
||||||
|
# /cassandra contains the metrics of the first cluster registered
|
||||||
|
if 'cassandra' not in scales._Stats.stats:
|
||||||
|
scales._Stats.stats['cassandra'] = scales._Stats.stats['_cassandra'][self.stats_id]
|
||||||
|
|
||||||
self.request_timer = self.stats.request_timer
|
self.request_timer = self.stats.request_timer
|
||||||
self.connection_errors = self.stats.connection_errors
|
self.connection_errors = self.stats.connection_errors
|
||||||
self.write_timeouts = self.stats.write_timeouts
|
self.write_timeouts = self.stats.write_timeouts
|
||||||
@@ -164,3 +171,9 @@ class Metrics(object):
|
|||||||
|
|
||||||
def on_retry(self):
|
def on_retry(self):
|
||||||
self.stats.retries += 1
|
self.stats.retries += 1
|
||||||
|
|
||||||
|
def get_stats(self):
|
||||||
|
"""
|
||||||
|
Returns the metrics for the registered cluster instance.
|
||||||
|
"""
|
||||||
|
return scales.getStats()['_cassandra'][self.stats_id]
|
||||||
|
|||||||
@@ -173,3 +173,38 @@ class MetricsTests(unittest.TestCase):
|
|||||||
# def test_retry(self):
|
# def test_retry(self):
|
||||||
# # TODO: Look for ways to generate retries
|
# # TODO: Look for ways to generate retries
|
||||||
# pass
|
# pass
|
||||||
|
|
||||||
|
def test_metrics_per_cluster(self):
|
||||||
|
"""
|
||||||
|
Test that metrics are per cluster.
|
||||||
|
"""
|
||||||
|
|
||||||
|
cluster2 = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION,
|
||||||
|
default_retry_policy=FallthroughRetryPolicy())
|
||||||
|
session2 = cluster2.connect("test3rf", wait_for_all_pools=True)
|
||||||
|
|
||||||
|
query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL)
|
||||||
|
self.session.execute(query)
|
||||||
|
|
||||||
|
# Pause node so it shows as unreachable to coordinator
|
||||||
|
get_node(1).pause()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Test write
|
||||||
|
query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL)
|
||||||
|
with self.assertRaises(WriteTimeout):
|
||||||
|
self.session.execute(query, timeout=None)
|
||||||
|
finally:
|
||||||
|
get_node(1).resume()
|
||||||
|
|
||||||
|
stats_cluster1 = self.cluster.metrics.get_stats()
|
||||||
|
stats_cluster2 = cluster2.metrics.get_stats()
|
||||||
|
|
||||||
|
self.assertEqual(1, self.cluster.metrics.stats.write_timeouts)
|
||||||
|
self.assertEqual(0, cluster2.metrics.stats.write_timeouts)
|
||||||
|
|
||||||
|
self.assertNotEqual(0.0, self.cluster.metrics.request_timer['mean'])
|
||||||
|
self.assertEqual(0.0, cluster2.metrics.request_timer['mean'])
|
||||||
|
|
||||||
|
self.assertNotEqual(0.0, stats_cluster1['request_timer']['mean'])
|
||||||
|
self.assertEqual(0.0, stats_cluster2['request_timer']['mean'])
|
||||||
|
|||||||
Reference in New Issue
Block a user