Basic dictionary reporter in place of the java JMX reporter.
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
from .compound_stat import NamedMeasurable
|
||||
from .dict_reporter import DictReporter
|
||||
from .kafka_metric import KafkaMetric
|
||||
from .measurable import AnonMeasurable
|
||||
from .metric_config import MetricConfig
|
||||
@@ -7,6 +8,6 @@ from .metrics import Metrics
|
||||
from .quota import Quota
|
||||
|
||||
__all__ = [
|
||||
'AnonMeasurable', 'KafkaMetric', 'MetricConfig',
|
||||
'AnonMeasurable', 'DictReporter', 'KafkaMetric', 'MetricConfig',
|
||||
'MetricName', 'Metrics', 'NamedMeasurable', 'Quota'
|
||||
]
|
||||
|
82
kafka/metrics/dict_reporter.py
Normal file
82
kafka/metrics/dict_reporter.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from kafka.metrics.metrics_reporter import AbstractMetricsReporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DictReporter(AbstractMetricsReporter):
|
||||
"""A basic dictionary based metrics reporter.
|
||||
|
||||
Store all metrics in a two level dictionary of category > name > metric.
|
||||
"""
|
||||
def __init__(self, prefix=''):
|
||||
self._lock = threading.RLock()
|
||||
self._prefix = prefix if prefix else '' # never allow None
|
||||
self._store = {}
|
||||
|
||||
def snapshot(self):
|
||||
"""
|
||||
Return a nested dictionary snapshot of all metrics and their
|
||||
values at this time. Example:
|
||||
{
|
||||
'category': {
|
||||
'metric1_name': 42.0,
|
||||
'metric2_name': 'foo'
|
||||
}
|
||||
}
|
||||
"""
|
||||
return dict((category, dict((name, metric.value())
|
||||
for name, metric in metrics.items()))
|
||||
for category, metrics in
|
||||
self._store.items())
|
||||
|
||||
def init(self, metrics):
|
||||
with self._lock:
|
||||
for metric in metrics:
|
||||
self.metric_change(metric)
|
||||
|
||||
def metric_change(self, metric):
|
||||
with self._lock:
|
||||
category = self.get_category(metric)
|
||||
if category not in self._store:
|
||||
self._store[category] = {}
|
||||
self._store[category][metric.metric_name.name] = metric
|
||||
|
||||
def metric_removal(self, metric):
|
||||
with self._lock:
|
||||
category = self.get_category(metric)
|
||||
metrics = self._store.get(category, {})
|
||||
removed = metrics.pop(metric.metric_name.name, None)
|
||||
if not metrics:
|
||||
self._store.pop(category, None)
|
||||
return removed
|
||||
|
||||
def get_category(self, metric):
|
||||
"""
|
||||
Return a string category for the metric.
|
||||
|
||||
The category is made up of this reporter's prefix and the
|
||||
metric's group and tags.
|
||||
|
||||
Examples:
|
||||
prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2}
|
||||
returns: 'foo.bar.a=1,b=2'
|
||||
|
||||
prefix = 'foo', group = 'bar', tags = None
|
||||
returns: 'foo.bar'
|
||||
|
||||
prefix = None, group = 'bar', tags = None
|
||||
returns: 'bar'
|
||||
"""
|
||||
tags = ','.join('%s=%s' % (k, v) for k, v in
|
||||
sorted(metric.metric_name.tags.items()))
|
||||
return '.'.join(x for x in
|
||||
[self._prefix, metric.metric_name.group, tags] if x)
|
||||
|
||||
def configure(self, configs):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
@@ -4,7 +4,7 @@ import time
|
||||
import pytest
|
||||
|
||||
from kafka.errors import QuotaViolationError
|
||||
from kafka.metrics import MetricConfig, MetricName, Metrics, Quota
|
||||
from kafka.metrics import DictReporter, MetricConfig, MetricName, Metrics, Quota
|
||||
from kafka.metrics.measurable import AbstractMeasurable
|
||||
from kafka.metrics.stats import (Avg, Count, Max, Min, Percentile, Percentiles,
|
||||
Rate, Total)
|
||||
@@ -25,8 +25,13 @@ def config():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metrics(request, config):
|
||||
metrics = Metrics(config, None, enable_expiration=True)
|
||||
def reporter():
|
||||
return DictReporter()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metrics(request, config, reporter):
|
||||
metrics = Metrics(config, [reporter], enable_expiration=True)
|
||||
request.addfinalizer(lambda: metrics.close())
|
||||
return metrics
|
||||
|
||||
@@ -440,6 +445,34 @@ def test_rate_windowing(mocker, time_keeper, metrics):
|
||||
< EPS, 'Elapsed Time = 75 seconds'
|
||||
|
||||
|
||||
def test_reporter(metrics):
|
||||
reporter = DictReporter()
|
||||
foo_reporter = DictReporter(prefix='foo')
|
||||
metrics.add_reporter(reporter)
|
||||
metrics.add_reporter(foo_reporter)
|
||||
sensor = metrics.sensor('kafka.requests')
|
||||
sensor.add(metrics.metric_name('pack.bean1.avg', 'grp1'), Avg())
|
||||
sensor.add(metrics.metric_name('pack.bean2.total', 'grp2'), Total())
|
||||
sensor2 = metrics.sensor('kafka.blah')
|
||||
sensor2.add(metrics.metric_name('pack.bean1.some', 'grp1'), Total())
|
||||
sensor2.add(metrics.metric_name('pack.bean2.some', 'grp1',
|
||||
tags={'a': 42, 'b': 'bar'}), Total())
|
||||
|
||||
# kafka-metrics-count > count is the total number of metrics and automatic
|
||||
expected = {
|
||||
'kafka-metrics-count': {'count': 5.0},
|
||||
'grp2': {'pack.bean2.total': 0.0},
|
||||
'grp1': {'pack.bean1.avg': 0.0, 'pack.bean1.some': 0.0},
|
||||
'grp1.a=42,b=bar': {'pack.bean2.some': 0.0},
|
||||
}
|
||||
assert expected == reporter.snapshot()
|
||||
|
||||
for key in list(expected.keys()):
|
||||
metrics = expected.pop(key)
|
||||
expected['foo.%s' % key] = metrics
|
||||
assert expected == foo_reporter.snapshot()
|
||||
|
||||
|
||||
class ConstantMeasurable(AbstractMeasurable):
|
||||
_value = 0.0
|
||||
|
||||
|
Reference in New Issue
Block a user