Merge pull request #637 from zackdever/metrics

Metrics java port
This commit is contained in:
Zack Dever
2016-04-14 11:02:01 -07:00
32 changed files with 1985 additions and 112 deletions

View File

@@ -3,11 +3,13 @@ from __future__ import absolute_import
import collections
import copy
import logging
import time
import six
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
@@ -40,7 +42,8 @@ class Fetcher(six.Iterator):
'api_version': (0, 8, 0),
}
def __init__(self, client, subscriptions, **configs):
def __init__(self, client, subscriptions, metrics, metric_group_prefix,
**configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@@ -68,8 +71,6 @@ class Fetcher(six.Iterator):
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
"""
#metrics=None,
#metric_group_prefix='consumer',
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -83,8 +84,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
@@ -109,7 +109,7 @@ class Fetcher(six.Iterator):
if self._client.ready(node_id):
log.debug("Sending FetchRequest to node %s", node_id)
future = self._client.send(node_id, request)
future.add_callback(self._handle_fetch_response, request)
future.add_callback(self._handle_fetch_response, request, time.time())
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
self._fetch_futures.extend(futures)
@@ -575,10 +575,11 @@ class Fetcher(six.Iterator):
partition_data.items())
return requests
def _handle_fetch_response(self, request, response):
def _handle_fetch_response(self, request, send_time, response):
"""The callback for fetch completion"""
#total_bytes = 0
#total_count = 0
total_bytes = 0
total_count = 0
recv_time = time.time()
fetch_offsets = {}
for topic, partitions in request.topics:
@@ -609,6 +610,7 @@ class Fetcher(six.Iterator):
position)
continue
num_bytes = 0
partial = None
if messages and isinstance(messages[-1][-1], PartialMessage):
partial = messages.pop()
@@ -618,18 +620,18 @@ class Fetcher(six.Iterator):
" offset %d to buffered record list", tp,
position)
self._records.append((fetch_offset, tp, messages))
#last_offset, _, _ = messages[-1]
#self.sensors.records_fetch_lag.record(highwater - last_offset)
last_offset, _, _ = messages[-1]
self._sensors.records_fetch_lag.record(highwater - last_offset)
num_bytes = sum(msg[1] for msg in messages)
elif partial:
# we did not read a single message from a non-empty
# buffer because that message's size is larger than
# fetch size, in this case record this exception
self._record_too_large_partitions[tp] = fetch_offset
# TODO: bytes metrics
#self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size());
#totalBytes += num_bytes;
#totalCount += parsed.size();
self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
total_bytes += num_bytes
total_count += len(messages)
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
self._client.cluster.request_update()
@@ -649,56 +651,82 @@ class Fetcher(six.Iterator):
else:
raise error_type('Unexpected error while fetching data')
"""TOOD - metrics
self.sensors.bytesFetched.record(totalBytes)
self.sensors.recordsFetched.record(totalCount)
self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime())
self.sensors.fetchLatency.record(resp.requestLatencyMs())
self._sensors.bytes_fetched.record(total_bytes)
self._sensors.records_fetched.record(total_count)
self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms'])
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
self.metrics = metrics
self.group_name = prefix + "-fetch-manager-metrics"
self.group_name = '%s-fetch-manager-metrics' % prefix
self.bytes_fetched = metrics.sensor("bytes-fetched")
self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name,
"The average number of bytes fetched per request"), metrics.Avg())
self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name,
"The maximum number of bytes fetched per request"), metrics.Max())
self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name,
"The average number of bytes consumed per second"), metrics.Rate())
self.bytes_fetched = metrics.sensor('bytes-fetched')
self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
'The average number of bytes fetched per request'), Avg())
self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name,
'The maximum number of bytes fetched per request'), Max())
self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name,
'The average number of bytes consumed per second'), Rate())
self.records_fetched = self.metrics.sensor("records-fetched")
self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name,
"The average number of records in each request"), metrics.Avg())
self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name,
"The average number of records consumed per second"), metrics.Rate())
self.records_fetched = self.metrics.sensor('records-fetched')
self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name,
'The average number of records in each request'), Avg())
self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name,
'The average number of records consumed per second'), Rate())
self.fetch_latency = metrics.sensor("fetch-latency")
self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name,
"The average time taken for a fetch request."), metrics.Avg())
self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name,
"The max time taken for any fetch request."), metrics.Max())
self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name,
"The number of fetch requests per second."), metrics.Rate(metrics.Count()))
self.fetch_latency = metrics.sensor('fetch-latency')
self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name,
'The average time taken for a fetch request.'), Avg())
self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name,
'The max time taken for any fetch request.'), Max())
self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name,
'The number of fetch requests per second.'), Rate(sampled_stat=Count()))
self.records_fetch_lag = metrics.sensor("records-lag")
self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name,
"The maximum lag in terms of number of records for any partition in self window"), metrics.Max())
self.records_fetch_lag = metrics.sensor('records-lag')
self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
'The maximum lag in terms of number of records for any partition in self window'), Max())
self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time")
self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name,
"The average throttle time in ms"), metrics.Avg())
self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name,
"The maximum throttle time in ms"), metrics.Max())
self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
'The average throttle time in ms'), Avg())
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
'The maximum throttle time in ms'), Max())
def record_topic_fetch_metrics(topic, num_bytes, num_records):
# record bytes fetched
name = '.'.join(["topic", topic, "bytes-fetched"])
self.metrics[name].record(num_bytes);
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
metric_tags = {'topic': topic.replace('.', '_')}
# record records fetched
name = '.'.join(["topic", topic, "records-fetched"])
self.metrics[name].record(num_records)
"""
# record bytes fetched
name = '.'.join(['topic', topic, 'bytes-fetched'])
bytes_fetched = self.metrics.get_sensor(name)
if not bytes_fetched:
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
'The average number of bytes fetched per request for topic %s' % topic,
metric_tags), Avg())
bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
self.group_name,
'The maximum number of bytes fetched per request for topic %s' % topic,
metric_tags), Max())
bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
self.group_name,
'The average number of bytes consumed per second for topic %s' % topic,
metric_tags), Rate())
bytes_fetched.record(num_bytes)
# record records fetched
name = '.'.join(['topic', topic, 'records-fetched'])
records_fetched = self.metrics.get_sensor(name)
if not records_fetched:
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
'The average number of records in each request for topic %s' % topic,
metric_tags), Avg())
records_fetched.add(self.metrics.metric_name('records-consumed-rate',
self.group_name,
'The average number of records consumed per second for topic %s' % topic,
metric_tags), Rate())
records_fetched.record(num_records)

View File

@@ -12,6 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.metrics import DictReporter, MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -143,6 +144,13 @@ class KafkaConsumer(six.Iterator):
offset commits; 0.8.0 is what is left. If set to 'auto', will
attempt to infer the broker version by probing various APIs.
Default: auto
metric_reporters (list): A list of classes to use as metrics reporters.
Implementing the AbstractMetricsReporter interface allows plugging
in classes that will be notified of new metric creation. Default: []
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
metrics_sample_window_ms (int): The number of samples maintained to
compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -181,9 +189,9 @@ class KafkaConsumer(six.Iterator):
'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
#'metric_reporters': None,
#'metrics_num_samples': 2,
#'metrics_sample_window_ms': 30000,
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
}
def __init__(self, *topics, **configs):
@@ -202,6 +210,16 @@ class KafkaConsumer(six.Iterator):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
reporters.append(DictReporter('kafka.consumer'))
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
self._client = KafkaClient(**self.config)
# Check Broker Version if not set explicitly
@@ -215,16 +233,15 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, **self.config)
self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
self._coordinator = ConsumerCoordinator(
self._client, self._subscription,
self._client, self._subscription, self._metrics, metric_group_prefix,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
self._iterator = None
self._consumer_timeout = float('inf')
#self.metrics = None
if topics:
self._subscription.subscribe(topics=topics)
self._client.set_topics(topics)
@@ -277,7 +294,7 @@ class KafkaConsumer(six.Iterator):
log.debug("Closing the KafkaConsumer.")
self._closed = True
self._coordinator.close()
#self.metrics.close()
self._metrics.close()
self._client.close()
try:
self.config['key_deserializer'].close()

View File

@@ -658,7 +658,7 @@ class GroupCoordinatorMetrics(object):
self.heartbeat_latency.add(metrics.metricName(
"heartbeat-rate", self.group_name,
"The average number of heartbeats per second",
tags), metrics.Rate(metrics.Count()))
tags), metrics.Rate(sampled_stat=metrics.Count()))
self.join_latency = metrics.sensor("join-latency")
self.join_latency.add(metrics.metricName(
@@ -672,7 +672,7 @@ class GroupCoordinatorMetrics(object):
self.join_latency.add(metrics.metricName(
"join-rate", self.group_name,
"The number of group joins per second",
tags), metrics.Rate(metrics.Count()))
tags), metrics.Rate(sampled_stat=metrics.Count()))
self.sync_latency = metrics.sensor("sync-latency")
self.sync_latency.add(metrics.metricName(
@@ -686,7 +686,7 @@ class GroupCoordinatorMetrics(object):
self.sync_latency.add(metrics.metricName(
"sync-rate", self.group_name,
"The number of group syncs per second",
tags), metrics.Rate(metrics.Count()))
tags), metrics.Rate(sampled_stat=metrics.Count()))
"""
lastHeartbeat = Measurable(

View File

@@ -14,6 +14,8 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
from .. import errors as Errors
from ..future import Future
from ..metrics import AnonMeasurable
from ..metrics.stats import Avg, Count, Max, Rate
from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
@@ -36,7 +38,8 @@ class ConsumerCoordinator(BaseCoordinator):
'api_version': (0, 9),
}
def __init__(self, client, subscription, **configs):
def __init__(self, client, subscription, metrics, metric_group_prefix,
**configs):
"""Initialize the coordination manager.
Keyword Arguments:
@@ -97,10 +100,8 @@ class ConsumerCoordinator(BaseCoordinator):
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
# metrics=None,
# metric_group_prefix=None,
# metric_tags=None,
# self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
self._subscription)
def __del__(self):
if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
@@ -470,12 +471,13 @@ class ConsumerCoordinator(BaseCoordinator):
future = Future()
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_commit_response, offsets, future)
_f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())
_f.add_errback(self._failed_request, node_id, request, future)
return future
def _handle_offset_commit_response(self, offsets, future, response):
#self.sensors.commit_latency.record(response.requestLatencyMs())
def _handle_offset_commit_response(self, offsets, future, send_time, response):
# TODO look at adding request_latency_ms to response (like java kafka)
self._sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics:
@@ -720,38 +722,25 @@ class AutoCommitTask(object):
self._reschedule(next_at)
# TODO
"""
class ConsumerCoordinatorMetrics(object):
def __init__(self, metrics, prefix, tags):
def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
self.group_name = prefix + "-coordinator-metrics"
self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
self.commit_latency = metrics.sensor("commit-latency")
self.commit_latency.add(metrics.MetricName(
"commit-latency-avg", self.group_name,
"The average time taken for a commit request",
tags), metrics.Avg())
self.commit_latency.add(metrics.MetricName(
"commit-latency-max", self.group_name,
"The max time taken for a commit request",
tags), metrics.Max())
self.commit_latency.add(metrics.MetricName(
"commit-rate", self.group_name,
"The number of commit calls per second",
tags), metrics.Rate(metrics.Count()))
self.commit_latency = metrics.sensor('commit-latency')
self.commit_latency.add(metrics.metric_name(
'commit-latency-avg', self.metric_group_name,
'The average time taken for a commit request'), Avg())
self.commit_latency.add(metrics.metric_name(
'commit-latency-max', self.metric_group_name,
'The max time taken for a commit request'), Max())
self.commit_latency.add(metrics.metric_name(
'commit-rate', self.metric_group_name,
'The number of commit calls per second'), Rate(sampled_stat=Count()))
'''
def _num_partitions(config, now):
new Measurable() {
public double measure(MetricConfig config, long now) {
return subscriptions.assignedPartitions().size();
}
};
metrics.addMetric(new MetricName("assigned-partitions",
this.metricGrpName,
"The number of partitions currently assigned to this consumer",
tags),
numParts);
'''
"""
num_parts = AnonMeasurable(lambda config, now:
len(subscription.assigned_partitions()))
metrics.add_metric(metrics.metric_name(
'assigned-partitions', self.metric_group_name,
'The number of partitions currently assigned to this consumer'),
num_parts)

View File

@@ -361,6 +361,10 @@ class KafkaConfigurationError(KafkaError):
pass
class QuotaViolationError(KafkaError):
pass
class AsyncProducerQueueFull(KafkaError):
def __init__(self, failed_msgs, *args):
super(AsyncProducerQueueFull, self).__init__(*args)

13
kafka/metrics/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
from .compound_stat import NamedMeasurable
from .dict_reporter import DictReporter
from .kafka_metric import KafkaMetric
from .measurable import AnonMeasurable
from .metric_config import MetricConfig
from .metric_name import MetricName
from .metrics import Metrics
from .quota import Quota
__all__ = [
'AnonMeasurable', 'DictReporter', 'KafkaMetric', 'MetricConfig',
'MetricName', 'Metrics', 'NamedMeasurable', 'Quota'
]

View File

@@ -0,0 +1,32 @@
import abc
from kafka.metrics.stat import AbstractStat
class AbstractCompoundStat(AbstractStat):
"""
A compound stat is a stat where a single measurement and associated
data structure feeds many metrics. This is the example for a
histogram which has many associated percentiles.
"""
__metaclass__ = abc.ABCMeta
def stats(self):
"""
Return list of NamedMeasurable
"""
raise NotImplementedError
class NamedMeasurable(object):
def __init__(self, metric_name, measurable_stat):
self._name = metric_name
self._stat = measurable_stat
@property
def name(self):
return self._name
@property
def stat(self):
return self._stat

View File

@@ -0,0 +1,81 @@
import logging
import threading
from kafka.metrics.metrics_reporter import AbstractMetricsReporter
logger = logging.getLogger(__name__)
class DictReporter(AbstractMetricsReporter):
"""A basic dictionary based metrics reporter.
Store all metrics in a two level dictionary of category > name > metric.
"""
def __init__(self, prefix=''):
self._lock = threading.Lock()
self._prefix = prefix if prefix else '' # never allow None
self._store = {}
def snapshot(self):
"""
Return a nested dictionary snapshot of all metrics and their
values at this time. Example:
{
'category': {
'metric1_name': 42.0,
'metric2_name': 'foo'
}
}
"""
return dict((category, dict((name, metric.value())
for name, metric in list(metrics.items())))
for category, metrics in
list(self._store.items()))
def init(self, metrics):
for metric in metrics:
self.metric_change(metric)
def metric_change(self, metric):
with self._lock:
category = self.get_category(metric)
if category not in self._store:
self._store[category] = {}
self._store[category][metric.metric_name.name] = metric
def metric_removal(self, metric):
with self._lock:
category = self.get_category(metric)
metrics = self._store.get(category, {})
removed = metrics.pop(metric.metric_name.name, None)
if not metrics:
self._store.pop(category, None)
return removed
def get_category(self, metric):
"""
Return a string category for the metric.
The category is made up of this reporter's prefix and the
metric's group and tags.
Examples:
prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2}
returns: 'foo.bar.a=1,b=2'
prefix = 'foo', group = 'bar', tags = None
returns: 'foo.bar'
prefix = None, group = 'bar', tags = None
returns: 'bar'
"""
tags = ','.join('%s=%s' % (k, v) for k, v in
sorted(metric.metric_name.tags.items()))
return '.'.join(x for x in
[self._prefix, metric.metric_name.group, tags] if x)
def configure(self, configs):
pass
def close(self):
pass

View File

@@ -0,0 +1,34 @@
import time
class KafkaMetric(object):
# NOTE java constructor takes a lock instance
def __init__(self, metric_name, measurable, config):
if not metric_name:
raise ValueError('metric_name must be non-empty')
if not measurable:
raise ValueError('measurable must be non-empty')
self._metric_name = metric_name
self._measurable = measurable
self._config = config
@property
def metric_name(self):
return self._metric_name
@property
def measurable(self):
return self._measurable
@property
def config(self):
return self._config
@config.setter
def config(self, config):
self._config = config
def value(self, time_ms=None):
if time_ms is None:
time_ms = time.time() * 1000
return self.measurable.measure(self.config, time_ms)

View File

@@ -0,0 +1,27 @@
import abc
class AbstractMeasurable(object):
"""A measurable quantity that can be registered as a metric"""
@abc.abstractmethod
def measure(self, config, now):
"""
Measure this quantity and return the result
Arguments:
config (MetricConfig): The configuration for this metric
now (int): The POSIX time in milliseconds the measurement
is being taken
Returns:
The measured value
"""
raise NotImplementedError
class AnonMeasurable(AbstractMeasurable):
def __init__(self, measure_fn):
self._measure_fn = measure_fn
def measure(self, config, now):
return float(self._measure_fn(config, now))

View File

@@ -0,0 +1,14 @@
import abc
from kafka.metrics.measurable import AbstractMeasurable
from kafka.metrics.stat import AbstractStat
class AbstractMeasurableStat(AbstractStat, AbstractMeasurable):
"""
An AbstractMeasurableStat is an AbstractStat that is also
an AbstractMeasurable (i.e. can produce a single floating point value).
This is the interface used for most of the simple statistics such
as Avg, Max, Count, etc.
"""
__metaclass__ = abc.ABCMeta

View File

@@ -0,0 +1,31 @@
import sys
class MetricConfig(object):
"""Configuration values for metrics"""
def __init__(self, quota=None, samples=2, event_window=sys.maxsize,
time_window_ms=30 * 1000, tags=None):
"""
Arguments:
quota (Quota, optional): Upper or lower bound of a value.
samples (int, optional): Max number of samples kept per metric.
event_window (int, optional): Max number of values per sample.
time_window_ms (int, optional): Max age of an individual sample.
tags (dict of {str: str}, optional): Tags for each metric.
"""
self.quota = quota
self._samples = samples
self.event_window = event_window
self.time_window_ms = time_window_ms
# tags should be OrderedDict (not supported in py26)
self.tags = tags if tags else {}
@property
def samples(self):
return self._samples
@samples.setter
def samples(self, value):
if value < 1:
raise ValueError('The number of samples must be at least 1.')
self._samples = value

View File

@@ -0,0 +1,104 @@
import copy
class MetricName(object):
"""
This class encapsulates a metric's name, logical group and its
related attributes (tags).
group, tags parameters can be used to create unique metric names.
e.g. domainName:type=group,key1=val1,key2=val2
Usage looks something like this:
# set up metrics:
metric_tags = {'client-id': 'producer-1', 'topic': 'topic'}
metric_config = MetricConfig(tags=metric_tags)
# metrics is the global repository of metrics and sensors
metrics = Metrics(metric_config)
sensor = metrics.sensor('message-sizes')
metric_name = metrics.metric_name('message-size-avg',
'producer-metrics',
'average message size')
sensor.add(metric_name, Avg())
metric_name = metrics.metric_name('message-size-max',
sensor.add(metric_name, Max())
tags = {'client-id': 'my-client', 'topic': 'my-topic'}
metric_name = metrics.metric_name('message-size-min',
'producer-metrics',
'message minimum size', tags)
sensor.add(metric_name, Min())
# as messages are sent we record the sizes
sensor.record(message_size)
"""
def __init__(self, name, group, description=None, tags=None):
"""
Arguments:
name (str): The name of the metric.
group (str): The logical group name of the metrics to which this
metric belongs.
description (str, optional): A human-readable description to
include in the metric.
tags (dict, optional): Additional key/val attributes of the metric.
"""
if not (name and group):
raise Exception('name and group must be non-empty.')
if tags is not None and not isinstance(tags, dict):
raise Exception('tags must be a dict if present.')
self._name = name
self._group = group
self._description = description
self._tags = copy.copy(tags)
self._hash = 0
@property
def name(self):
return self._name
@property
def group(self):
return self._group
@property
def description(self):
return self._description
@property
def tags(self):
return copy.copy(self._tags)
def __hash__(self):
if self._hash != 0:
return self._hash
prime = 31
result = 1
result = prime * result + hash(self.group)
result = prime * result + hash(self.name)
tags_hash = hash(frozenset(self.tags.items())) if self.tags else 0
result = prime * result + tags_hash
self._hash = result
return result
def __eq__(self, other):
if self is other:
return True
if other is None:
return False
return (type(self) == type(other) and
self.group == other.group and
self.name == other.name and
self.tags == other.tags)
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return 'MetricName(name=%s, group=%s, description=%s, tags=%s)' % (
self.name, self.group, self.description, self.tags)

257
kafka/metrics/metrics.py Normal file
View File

@@ -0,0 +1,257 @@
import logging
import sys
import time
import threading
from kafka.metrics import AnonMeasurable, KafkaMetric, MetricConfig, MetricName
from kafka.metrics.stats import Sensor
logger = logging.getLogger(__name__)
class Metrics(object):
"""
A registry of sensors and metrics.
A metric is a named, numerical measurement. A sensor is a handle to
record numerical measurements as they occur. Each Sensor has zero or
more associated metrics. For example a Sensor might represent message
sizes and we might associate with this sensor a metric for the average,
maximum, or other statistics computed off the sequence of message sizes
that are recorded by the sensor.
Usage looks something like this:
# set up metrics:
metrics = Metrics() # the global repository of metrics and sensors
sensor = metrics.sensor('message-sizes')
metric_name = MetricName('message-size-avg', 'producer-metrics')
sensor.add(metric_name, Avg())
metric_name = MetricName('message-size-max', 'producer-metrics')
sensor.add(metric_name, Max())
# as messages are sent we record the sizes
sensor.record(message_size);
"""
def __init__(self, default_config=None, reporters=None,
enable_expiration=False):
"""
Create a metrics repository with a default config, given metric
reporters and the ability to expire eligible sensors
Arguments:
default_config (MetricConfig, optional): The default config
reporters (list of AbstractMetricsReporter, optional):
The metrics reporters
enable_expiration (bool, optional): true if the metrics instance
can garbage collect inactive sensors, false otherwise
"""
self._lock = threading.RLock()
self._config = default_config or MetricConfig()
self._sensors = {}
self._metrics = {}
self._children_sensors = {}
self._reporters = reporters or []
for reporter in self._reporters:
reporter.init([])
if enable_expiration:
def expire_loop():
while True:
# delay 30 seconds
time.sleep(30)
self.ExpireSensorTask.run(self)
metrics_scheduler = threading.Thread(target=expire_loop)
# Creating a daemon thread to not block shutdown
metrics_scheduler.daemon = True
metrics_scheduler.start()
self.add_metric(self.metric_name('count', 'kafka-metrics-count',
'total number of registered metrics'),
AnonMeasurable(lambda config, now: len(self._metrics)))
@property
def config(self):
return self._config
@property
def metrics(self):
"""
Get all the metrics currently maintained and indexed by metricName
"""
return self._metrics
def metric_name(self, name, group, description='', tags=None):
"""
Create a MetricName with the given name, group, description and tags,
plus default tags specified in the metric configuration.
Tag in tags takes precedence if the same tag key is specified in
the default metric configuration.
Arguments:
name (str): The name of the metric
group (str): logical group name of the metrics to which this
metric belongs
description (str, optional): A human-readable description to
include in the metric
tags (dict, optionals): additional key/value attributes of
the metric
"""
combined_tags = dict(self.config.tags)
combined_tags.update(tags or {})
return MetricName(name, group, description, combined_tags)
def get_sensor(self, name):
"""
Get the sensor with the given name if it exists
Arguments:
name (str): The name of the sensor
Returns:
Sensor: The sensor or None if no such sensor exists
"""
if not name:
raise ValueError('name must be non-empty')
return self._sensors.get(name, None)
def sensor(self, name, config=None,
inactive_sensor_expiration_time_seconds=sys.maxsize,
parents=None):
"""
Get or create a sensor with the given unique name and zero or
more parent sensors. All parent sensors will receive every value
recorded with this sensor.
Arguments:
name (str): The name of the sensor
config (MetricConfig, optional): A default configuration to use
for this sensor for metrics that don't have their own config
inactive_sensor_expiration_time_seconds (int, optional):
If no value if recorded on the Sensor for this duration of
time, it is eligible for removal
parents (list of Sensor): The parent sensors
Returns:
Sensor: The sensor that is created
"""
sensor = self.get_sensor(name)
if sensor:
return sensor
with self._lock:
sensor = self.get_sensor(name)
if not sensor:
sensor = Sensor(self, name, parents, config or self.config,
inactive_sensor_expiration_time_seconds)
self._sensors[name] = sensor
if parents:
for parent in parents:
children = self._children_sensors.get(parent)
if not children:
children = []
self._children_sensors[parent] = children
children.append(sensor)
logger.debug('Added sensor with name %s', name)
return sensor
def remove_sensor(self, name):
"""
Remove a sensor (if it exists), associated metrics and its children.
Arguments:
name (str): The name of the sensor to be removed
"""
sensor = self._sensors.get(name)
if sensor:
child_sensors = None
with sensor._lock:
with self._lock:
val = self._sensors.pop(name, None)
if val and val == sensor:
for metric in sensor.metrics:
self.remove_metric(metric.metric_name)
logger.debug('Removed sensor with name %s', name)
child_sensors = self._children_sensors.pop(sensor, None)
if child_sensors:
for child_sensor in child_sensors:
self.remove_sensor(child_sensor.name)
def add_metric(self, metric_name, measurable, config=None):
"""
Add a metric to monitor an object that implements measurable.
This metric won't be associated with any sensor.
This is a way to expose existing values as metrics.
Arguments:
metricName (MetricName): The name of the metric
measurable (AbstractMeasurable): The measurable that will be
measured by this metric
config (MetricConfig, optional): The configuration to use when
measuring this measurable
"""
# NOTE there was a lock here, but i don't think it's needed
metric = KafkaMetric(metric_name, measurable, config or self.config)
self.register_metric(metric)
def remove_metric(self, metric_name):
"""
Remove a metric if it exists and return it. Return None otherwise.
If a metric is removed, `metric_removal` will be invoked
for each reporter.
Arguments:
metric_name (MetricName): The name of the metric
Returns:
KafkaMetric: the removed `KafkaMetric` or None if no such
metric exists
"""
with self._lock:
metric = self._metrics.pop(metric_name, None)
if metric:
for reporter in self._reporters:
reporter.metric_removal(metric)
return metric
def add_reporter(self, reporter):
"""Add a MetricReporter"""
with self._lock:
reporter.init(list(self.metrics.values()))
self._reporters.append(reporter)
def register_metric(self, metric):
with self._lock:
if metric.metric_name in self.metrics:
raise ValueError('A metric named "%s" already exists, cannot'
' register another one.' % metric.metric_name)
self.metrics[metric.metric_name] = metric
for reporter in self._reporters:
reporter.metric_change(metric)
class ExpireSensorTask(object):
"""
This iterates over every Sensor and triggers a remove_sensor
if it has expired. Package private for testing
"""
@staticmethod
def run(metrics):
items = list(metrics._sensors.items())
for name, sensor in items:
# remove_sensor also locks the sensor object. This is fine
# because synchronized is reentrant. There is however a minor
# race condition here. Assume we have a parent sensor P and
# child sensor C. Calling record on C would cause a record on
# P as well. So expiration time for P == expiration time for C.
# If the record on P happens via C just after P is removed,
# that will cause C to also get removed. Since the expiration
# time is typically high it is not expected to be a significant
# concern and thus not necessary to optimize
with sensor._lock:
if sensor.has_expired():
logger.debug('Removing expired sensor %s', name)
metrics.remove_sensor(name)
def close(self):
"""Close this metrics repository."""
for reporter in self._reporters:
reporter.close()

View File

@@ -0,0 +1,55 @@
import abc
class AbstractMetricsReporter(object):
"""
An abstract class to allow things to listen as new metrics
are created so they can be reported.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def init(self, metrics):
"""
This is called when the reporter is first registered
to initially register all existing metrics
Arguments:
metrics (list of KafkaMetric): All currently existing metrics
"""
raise NotImplementedError
@abc.abstractmethod
def metric_change(self, metric):
"""
This is called whenever a metric is updated or added
Arguments:
metric (KafkaMetric)
"""
raise NotImplementedError
@abc.abstractmethod
def metric_removal(self, metric):
"""
This is called whenever a metric is removed
Arguments:
metric (KafkaMetric)
"""
raise NotImplementedError
@abc.abstractmethod
def configure(self, configs):
"""
Configure this class with the given key-value pairs
Arguments:
configs (dict of {str, ?})
"""
raise NotImplementedError
@abc.abstractmethod
def close(self):
"""Called when the metrics repository is closed."""
raise NotImplementedError

39
kafka/metrics/quota.py Normal file
View File

@@ -0,0 +1,39 @@
class Quota(object):
"""An upper or lower bound for metrics"""
def __init__(self, bound, is_upper):
self._bound = bound
self._upper = is_upper
@staticmethod
def upper_bound(upper_bound):
return Quota(upper_bound, True)
@staticmethod
def lower_bound(lower_bound):
return Quota(lower_bound, False)
def is_upper_bound(self):
return self._upper
@property
def bound(self):
return self._bound
def is_acceptable(self, value):
return ((self.is_upper_bound() and value <= self.bound) or
(not self.is_upper_bound() and value >= self.bound))
def __hash__(self):
prime = 31
result = prime + self.bound
return prime * result + self.is_upper_bound()
def __eq__(self, other):
if self is other:
return True
return (type(self) == type(other) and
self.bound == other.bound and
self.is_upper_bound() == other.is_upper_bound())
def __ne__(self, other):
return not self.__eq__(other)

21
kafka/metrics/stat.py Normal file
View File

@@ -0,0 +1,21 @@
import abc
class AbstractStat(object):
"""
An AbstractStat is a quantity such as average, max, etc that is computed
off the stream of updates to a sensor
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def record(self, config, value, time_ms):
"""
Record the given value
Arguments:
config (MetricConfig): The configuration to use for this metric
value (float): The value to record
timeMs (int): The POSIX time in milliseconds this value occurred
"""
raise NotImplementedError

View File

@@ -0,0 +1,15 @@
from .avg import Avg
from .count import Count
from .histogram import Histogram
from .max_stat import Max
from .min_stat import Min
from .percentile import Percentile
from .percentiles import Percentiles
from .rate import Rate
from .sensor import Sensor
from .total import Total
__all__ = [
'Avg', 'Count', 'Histogram', 'Max', 'Min', 'Percentile', 'Percentiles',
'Rate', 'Sensor', 'Total'
]

View File

@@ -0,0 +1,22 @@
from kafka.metrics.stats.sampled_stat import AbstractSampledStat
class Avg(AbstractSampledStat):
"""
An AbstractSampledStat that maintains a simple average over its samples.
"""
def __init__(self):
super(Avg, self).__init__(0.0)
def update(self, sample, config, value, now):
sample.value += value
def combine(self, samples, config, now):
total_sum = 0
total_count = 0
for sample in samples:
total_sum += sample.value
total_count += sample.event_count
if not total_count:
return 0
return float(total_sum) / total_count

View File

@@ -0,0 +1,15 @@
from kafka.metrics.stats.sampled_stat import AbstractSampledStat
class Count(AbstractSampledStat):
"""
An AbstractSampledStat that maintains a simple count of what it has seen.
"""
def __init__(self):
super(Count, self).__init__(0.0)
def update(self, sample, config, value, now):
sample.value += 1.0
def combine(self, samples, config, now):
return float(sum(sample.value for sample in samples))

View File

@@ -0,0 +1,93 @@
import math
class Histogram(object):
def __init__(self, bin_scheme):
self._hist = [0.0] * bin_scheme.bins
self._count = 0.0
self._bin_scheme = bin_scheme
def record(self, value):
self._hist[self._bin_scheme.to_bin(value)] += 1.0
self._count += 1.0
def value(self, quantile):
if self._count == 0.0:
return float('NaN')
_sum = 0.0
quant = float(quantile)
for i, value in enumerate(self._hist[:-1]):
_sum += value
if _sum / self._count > quant:
return self._bin_scheme.from_bin(i)
return float('inf')
@property
def counts(self):
return self._hist
def clear(self):
for i in range(self._hist):
self._hist[i] = 0.0
self._count = 0
def __str__(self):
values = ['%.10f:%.0f' % (self._bin_scheme.from_bin(i), value) for
i, value in enumerate(self._hist[:-1])]
values.append('%s:%s' % (float('inf'), self._hist[-1]))
return '{%s}' % ','.join(values)
class ConstantBinScheme(object):
def __init__(self, bins, min_val, max_val):
if bins < 2:
raise ValueError('Must have at least 2 bins.')
self._min = float(min_val)
self._max = float(max_val)
self._bins = int(bins)
self._bucket_width = (max_val - min_val) / (bins - 2)
@property
def bins(self):
return self._bins
def from_bin(self, b):
if b == 0:
return float('-inf')
elif b == self._bins - 1:
return float('inf')
else:
return self._min + (b - 1) * self._bucket_width
def to_bin(self, x):
if x < self._min:
return 0
elif x > self._max:
return self._bins - 1
else:
return int(((x - self._min) / self._bucket_width) + 1)
class LinearBinScheme(object):
def __init__(self, num_bins, max_val):
self._bins = num_bins
self._max = max_val
self._scale = max_val / (num_bins * (num_bins - 1) / 2)
@property
def bins(self):
return self._bins
def from_bin(self, b):
if b == self._bins - 1:
return float('inf')
else:
unscaled = (b * (b + 1.0)) / 2.0
return unscaled * self._scale
def to_bin(self, x):
if x < 0.0:
raise ValueError('Values less than 0.0 not accepted.')
elif x > self._max:
return self._bins - 1
else:
scaled = x / self._scale
return int(-0.5 + math.sqrt(2.0 * scaled + 0.25))

View File

@@ -0,0 +1,15 @@
from kafka.metrics.stats.sampled_stat import AbstractSampledStat
class Max(AbstractSampledStat):
"""An AbstractSampledStat that gives the max over its samples."""
def __init__(self):
super(Max, self).__init__(float('-inf'))
def update(self, sample, config, value, now):
sample.value = max(sample.value, value)
def combine(self, samples, config, now):
if not samples:
return float('-inf')
return float(max(sample.value for sample in samples))

View File

@@ -0,0 +1,17 @@
import sys
from kafka.metrics.stats.sampled_stat import AbstractSampledStat
class Min(AbstractSampledStat):
"""An AbstractSampledStat that gives the min over its samples."""
def __init__(self):
super(Min, self).__init__(float(sys.maxsize))
def update(self, sample, config, value, now):
sample.value = min(sample.value, value)
def combine(self, samples, config, now):
if not samples:
return float(sys.maxsize)
return float(min(sample.value for sample in samples))

View File

@@ -0,0 +1,12 @@
class Percentile(object):
def __init__(self, metric_name, percentile):
self._metric_name = metric_name
self._percentile = float(percentile)
@property
def name(self):
return self._metric_name
@property
def percentile(self):
return self._percentile

View File

@@ -0,0 +1,72 @@
from kafka.metrics import AnonMeasurable, NamedMeasurable
from kafka.metrics.compound_stat import AbstractCompoundStat
from kafka.metrics.stats import Histogram
from kafka.metrics.stats.sampled_stat import AbstractSampledStat
class BucketSizing(object):
CONSTANT = 0
LINEAR = 1
class Percentiles(AbstractSampledStat, AbstractCompoundStat):
"""A compound stat that reports one or more percentiles"""
def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
percentiles=None):
super(Percentiles, self).__init__(0.0)
self._percentiles = percentiles or []
self._buckets = int(size_in_bytes / 4)
if bucketing == BucketSizing.CONSTANT:
self._bin_scheme = Histogram.ConstantBinScheme(self._buckets,
min_val, max_val)
elif bucketing == BucketSizing.LINEAR:
if min_val != 0.0:
raise ValueError('Linear bucket sizing requires min_val'
' to be 0.0.')
self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
else:
ValueError('Unknown bucket type: %s' % bucketing)
def stats(self):
measurables = []
def make_measure_fn(pct):
return lambda config, now: self.value(config, now,
pct / 100.0)
for percentile in self._percentiles:
measure_fn = make_measure_fn(percentile.percentile)
stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn))
measurables.append(stat)
return measurables
def value(self, config, now, quantile):
self.purge_obsolete_samples(config, now)
count = sum(sample.event_count for sample in self._samples)
if count == 0.0:
return float('NaN')
sum_val = 0.0
quant = float(quantile)
for b in range(self._buckets):
for sample in self._samples:
assert type(sample) is self.HistogramSample
hist = sample.histogram.counts
sum_val += hist[b]
if sum_val / count > quant:
return self._bin_scheme.from_bin(b)
return float('inf')
def combine(self, samples, config, now):
return self.value(config, now, 0.5)
def new_sample(self, time_ms):
return Percentiles.HistogramSample(self._bin_scheme, time_ms)
def update(self, sample, config, value, time_ms):
assert type(sample) is self.HistogramSample
sample.histogram.record(value)
class HistogramSample(AbstractSampledStat.Sample):
def __init__(self, scheme, now):
super(Percentiles.HistogramSample, self).__init__(0.0, now)
self.histogram = Histogram(scheme)

115
kafka/metrics/stats/rate.py Normal file
View File

@@ -0,0 +1,115 @@
from kafka.metrics.measurable_stat import AbstractMeasurableStat
from kafka.metrics.stats.sampled_stat import AbstractSampledStat
class TimeUnit(object):
_names = {
'nanosecond': 0,
'microsecond': 1,
'millisecond': 2,
'second': 3,
'minute': 4,
'hour': 5,
'day': 6,
}
NANOSECONDS = _names['nanosecond']
MICROSECONDS = _names['microsecond']
MILLISECONDS = _names['millisecond']
SECONDS = _names['second']
MINUTES = _names['minute']
HOURS = _names['hour']
DAYS = _names['day']
@staticmethod
def get_name(time_unit):
return TimeUnit._names[time_unit]
class Rate(AbstractMeasurableStat):
"""
The rate of the given quantity. By default this is the total observed
over a set of samples from a sampled statistic divided by the elapsed
time over the sample windows. Alternative AbstractSampledStat
implementations can be provided, however, to record the rate of
occurrences (e.g. the count of values measured over the time interval)
or other such values.
"""
def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None):
self._stat = sampled_stat or SampledTotal()
self._unit = time_unit
def unit_name(self):
return TimeUnit.get_name(self._unit)
def record(self, config, value, time_ms):
self._stat.record(config, value, time_ms)
def measure(self, config, now):
value = self._stat.measure(config, now)
return float(value) / self.convert(self.window_size(config, now))
def window_size(self, config, now):
# purge old samples before we compute the window size
self._stat.purge_obsolete_samples(config, now)
"""
Here we check the total amount of time elapsed since the oldest
non-obsolete window. This give the total window_size of the batch
which is the time used for Rate computation. However, there is
an issue if we do not have sufficient data for e.g. if only
1 second has elapsed in a 30 second window, the measured rate
will be very high. Hence we assume that the elapsed time is
always N-1 complete windows plus whatever fraction of the final
window is complete.
Note that we could simply count the amount of time elapsed in
the current window and add n-1 windows to get the total time,
but this approach does not account for sleeps. AbstractSampledStat
only creates samples whenever record is called, if no record is
called for a period of time that time is not accounted for in
window_size and produces incorrect results.
"""
total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms
# Check how many full windows of data we have currently retained
num_full_windows = int(total_elapsed_time_ms / config.time_window_ms)
min_full_windows = config.samples - 1
# If the available windows are less than the minimum required,
# add the difference to the totalElapsedTime
if num_full_windows < min_full_windows:
total_elapsed_time_ms += ((min_full_windows - num_full_windows) *
config.time_window_ms)
return total_elapsed_time_ms
def convert(self, time_ms):
if self._unit == TimeUnit.NANOSECONDS:
return time_ms * 1000.0 * 1000.0
elif self._unit == TimeUnit.MICROSECONDS:
return time_ms * 1000.0
elif self._unit == TimeUnit.MILLISECONDS:
return time_ms
elif self._unit == TimeUnit.SECONDS:
return time_ms / 1000.0
elif self._unit == TimeUnit.MINUTES:
return time_ms / (60.0 * 1000.0)
elif self._unit == TimeUnit.HOURS:
return time_ms / (60.0 * 60.0 * 1000.0)
elif self._unit == TimeUnit.DAYS:
return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
else:
raise ValueError('Unknown unit: %s' % self._unit)
class SampledTotal(AbstractSampledStat):
def __init__(self, initial_value=None):
if initial_value is not None:
raise ValueError('initial_value cannot be set on SampledTotal')
super(SampledTotal, self).__init__(0.0)
def update(self, sample, config, value, time_ms):
sample.value += value
def combine(self, samples, config, now):
return float(sum(sample.value for sample in samples))

View File

@@ -0,0 +1,99 @@
import abc
from kafka.metrics.measurable_stat import AbstractMeasurableStat
class AbstractSampledStat(AbstractMeasurableStat):
"""
An AbstractSampledStat records a single scalar value measured over
one or more samples. Each sample is recorded over a configurable
window. The window can be defined by number of events or elapsed
time (or both, if both are given the window is complete when
*either* the event count or elapsed time criterion is met).
All the samples are combined to produce the measurement. When a
window is complete the oldest sample is cleared and recycled to
begin recording the next sample.
Subclasses of this class define different statistics measured
using this basic pattern.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, initial_value):
self._initial_value = initial_value
self._samples = []
self._current = 0
@abc.abstractmethod
def update(self, sample, config, value, time_ms):
raise NotImplementedError
@abc.abstractmethod
def combine(self, samples, config, now):
raise NotImplementedError
def record(self, config, value, time_ms):
sample = self.current(time_ms)
if sample.is_complete(time_ms, config):
sample = self._advance(config, time_ms)
self.update(sample, config, float(value), time_ms)
sample.event_count += 1
def new_sample(self, time_ms):
return self.Sample(self._initial_value, time_ms)
def measure(self, config, now):
self.purge_obsolete_samples(config, now)
return float(self.combine(self._samples, config, now))
def current(self, time_ms):
if not self._samples:
self._samples.append(self.new_sample(time_ms))
return self._samples[self._current]
def oldest(self, now):
if not self._samples:
self._samples.append(self.new_sample(now))
oldest = self._samples[0]
for sample in self._samples[1:]:
if sample.last_window_ms < oldest.last_window_ms:
oldest = sample
return oldest
def purge_obsolete_samples(self, config, now):
"""
Timeout any windows that have expired in the absence of any events
"""
expire_age = config.samples * config.time_window_ms
for sample in self._samples:
if now - sample.last_window_ms >= expire_age:
sample.reset(now)
def _advance(self, config, time_ms):
self._current = (self._current + 1) % config.samples
if self._current >= len(self._samples):
sample = self.new_sample(time_ms)
self._samples.append(sample)
return sample
else:
sample = self.current(time_ms)
sample.reset(time_ms)
return sample
class Sample(object):
def __init__(self, initial_value, now):
self.initial_value = initial_value
self.event_count = 0
self.last_window_ms = now
self.value = initial_value
def reset(self, now):
self.event_count = 0
self.last_window_ms = now
self.value = self.initial_value
def is_complete(self, time_ms, config):
return (time_ms - self.last_window_ms >= config.time_window_ms or
self.event_count >= config.event_window)

View File

@@ -0,0 +1,132 @@
import threading
import time
from kafka.errors import QuotaViolationError
from kafka.metrics import KafkaMetric
class Sensor(object):
"""
A sensor applies a continuous sequence of numerical values
to a set of associated metrics. For example a sensor on
message size would record a sequence of message sizes using
the `record(double)` api and would maintain a set
of metrics about request sizes such as the average or max.
"""
def __init__(self, registry, name, parents, config,
inactive_sensor_expiration_time_seconds):
if not name:
raise ValueError('name must be non-empty')
self._lock = threading.RLock()
self._registry = registry
self._name = name
self._parents = parents or []
self._metrics = []
self._stats = []
self._config = config
self._inactive_sensor_expiration_time_ms = (
inactive_sensor_expiration_time_seconds * 1000)
self._last_record_time = time.time() * 1000
self._check_forest(set())
def _check_forest(self, sensors):
"""Validate that this sensor doesn't end up referencing itself."""
if self in sensors:
raise ValueError('Circular dependency in sensors: %s is its own'
'parent.' % self.name)
sensors.add(self)
for parent in self._parents:
parent._check_forest(sensors)
@property
def name(self):
"""
The name this sensor is registered with.
This name will be unique among all registered sensors.
"""
return self._name
@property
def metrics(self):
return tuple(self._metrics)
def record(self, value=1.0, time_ms=None):
"""
Record a value at a known time.
Arguments:
value (double): The value we are recording
time_ms (int): The current POSIX time in milliseconds
Raises:
QuotaViolationException: if recording this value moves a
metric beyond its configured maximum or minimum bound
"""
now = time.time() * 1000
if time_ms is None:
time_ms = now
self._last_record_time = now
with self._lock: # XXX high volume, might be performance issue
# increment all the stats
for stat in self._stats:
stat.record(self._config, value, time_ms)
self._check_quotas(time_ms)
for parent in self._parents:
parent.record(value, time_ms)
def _check_quotas(self, time_ms):
"""
Check if we have violated our quota for any metric that
has a configured quota
"""
for metric in self._metrics:
if metric.config and metric.config.quota:
value = metric.value(time_ms)
if not metric.config.quota.is_acceptable(value):
raise QuotaViolationError('(%s) violated quota. Actual: '
'(%d), Threshold: (%d)' %
(metric.metric_name,
metric.config.quota.bound,
value))
def add_compound(self, compound_stat, config=None):
"""
Register a compound statistic with this sensor which
yields multiple measurable quantities (like a histogram)
Arguments:
stat (AbstractCompoundStat): The stat to register
config (MetricConfig): The configuration for this stat.
If None then the stat will use the default configuration
for this sensor.
"""
if not compound_stat:
raise ValueError('compound stat must be non-empty')
self._stats.append(compound_stat)
for named_measurable in compound_stat.stats():
metric = KafkaMetric(named_measurable.name, named_measurable.stat,
config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
def add(self, metric_name, stat, config=None):
"""
Register a metric with this sensor
Arguments:
metric_name (MetricName): The name of the metric
stat (AbstractMeasurableStat): The statistic to keep
config (MetricConfig): A special configuration for this metric.
If None use the sensor default configuration.
"""
with self._lock:
metric = KafkaMetric(metric_name, stat, config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
self._stats.append(stat)
def has_expired(self):
"""
Return True if the Sensor is eligible for removal due to inactivity.
"""
return ((time.time() * 1000 - self._last_record_time) >
self._inactive_sensor_expiration_time_ms)

View File

@@ -0,0 +1,13 @@
from kafka.metrics.measurable_stat import AbstractMeasurableStat
class Total(AbstractMeasurableStat):
"""An un-windowed cumulative total maintained over all time."""
def __init__(self, value=0.0):
self._total = value
def record(self, config, value, now):
self._total += value
def measure(self, config, now):
return float(self._total)

View File

@@ -1,5 +1,6 @@
# pylint: skip-file
from __future__ import absolute_import
import time
import pytest
@@ -14,6 +15,7 @@ from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import Metrics
from kafka.protocol.commit import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
@@ -23,12 +25,14 @@ from kafka.util import WeakMethod
@pytest.fixture
def coordinator(conn):
return ConsumerCoordinator(KafkaClient(), SubscriptionState())
return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(),
'consumer')
def test_init(conn):
cli = KafkaClient()
coordinator = ConsumerCoordinator(cli, SubscriptionState())
coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(),
'consumer')
# metadata update on init
assert cli.cluster._need_update is True
@@ -38,6 +42,7 @@ def test_init(conn):
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_autocommit_enable_api_version(conn, api_version):
coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
Metrics(), 'consumer',
enable_auto_commit=True,
group_id='foobar',
api_version=api_version)
@@ -354,6 +359,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
Metrics(), 'consumer',
api_version=api_version,
enable_auto_commit=enable,
group_id=group_id)
@@ -441,7 +447,7 @@ def test_send_offset_commit_request_failure(patched_coord, offsets):
assert future.exception is error
def test_send_offset_commit_request_success(patched_coord, offsets):
def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
_f = Future()
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_commit_request(offsets)
@@ -449,7 +455,7 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])])
_f.success(response)
patched_coord._handle_offset_commit_response.assert_called_with(
offsets, future, response)
offsets, future, mocker.ANY, response)
@pytest.mark.parametrize('response,error,dead,reassign', [
@@ -478,10 +484,11 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
(OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
Errors.TopicAuthorizationFailedError, False, False),
])
def test_handle_offset_commit_response(patched_coord, offsets,
def test_handle_offset_commit_response(mocker, patched_coord, offsets,
response, error, dead, reassign):
future = Future()
patched_coord._handle_offset_commit_response(offsets, future, response)
patched_coord._handle_offset_commit_response(offsets, future, time.time(),
response)
assert isinstance(future.exception, error)
assert patched_coord.coordinator_id is (None if dead else 0)
assert patched_coord._subscription.needs_partition_assignment is reassign

View File

@@ -8,6 +8,7 @@ from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
@@ -29,7 +30,7 @@ def fetcher(client, subscription_state):
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
return Fetcher(client, subscription_state)
return Fetcher(client, subscription_state, Metrics(), 'test_fetcher')
def test_init_fetches(fetcher, mocker):

499
test/test_metrics.py Normal file
View File

@@ -0,0 +1,499 @@
import sys
import time
import pytest
from kafka.errors import QuotaViolationError
from kafka.metrics import DictReporter, MetricConfig, MetricName, Metrics, Quota
from kafka.metrics.measurable import AbstractMeasurable
from kafka.metrics.stats import (Avg, Count, Max, Min, Percentile, Percentiles,
Rate, Total)
from kafka.metrics.stats.percentiles import BucketSizing
from kafka.metrics.stats.rate import TimeUnit
EPS = 0.000001
@pytest.fixture
def time_keeper():
return TimeKeeper()
@pytest.fixture
def config():
return MetricConfig()
@pytest.fixture
def reporter():
return DictReporter()
@pytest.fixture
def metrics(request, config, reporter):
metrics = Metrics(config, [reporter], enable_expiration=True)
request.addfinalizer(lambda: metrics.close())
return metrics
def test_MetricName():
# The Java test only cover the differences between the deprecated
# constructors, so I'm skipping them but doing some other basic testing.
# In short, metrics should be equal IFF their name, group, and tags are
# the same. Descriptions do not matter.
name1 = MetricName('name', 'group', 'A metric.', {'a': 1, 'b': 2})
name2 = MetricName('name', 'group', 'A description.', {'a': 1, 'b': 2})
assert name1 == name2
name1 = MetricName('name', 'group', tags={'a': 1, 'b': 2})
name2 = MetricName('name', 'group', tags={'a': 1, 'b': 2})
assert name1 == name2
name1 = MetricName('foo', 'group')
name2 = MetricName('name', 'group')
assert name1 != name2
name1 = MetricName('name', 'foo')
name2 = MetricName('name', 'group')
assert name1 != name2
# name and group must be non-empty. Everything else is optional.
with pytest.raises(Exception):
MetricName('', 'group')
with pytest.raises(Exception):
MetricName('name', None)
# tags must be a dict if supplied
with pytest.raises(Exception):
MetricName('name', 'group', tags=set())
# Because of the implementation of __eq__ and __hash__, the values of
# a MetricName cannot be mutable.
tags = {'a': 1}
name = MetricName('name', 'group', 'description', tags=tags)
with pytest.raises(AttributeError):
name.name = 'new name'
with pytest.raises(AttributeError):
name.group = 'new name'
with pytest.raises(AttributeError):
name.tags = {}
# tags is a copy, so the instance isn't altered
name.tags['b'] = 2
assert name.tags == tags
def test_simple_stats(mocker, time_keeper, config, metrics):
mocker.patch('time.time', side_effect=time_keeper.time)
measurable = ConstantMeasurable()
metrics.add_metric(metrics.metric_name('direct.measurable', 'grp1',
'The fraction of time an appender waits for space allocation.'),
measurable)
sensor = metrics.sensor('test.sensor')
sensor.add(metrics.metric_name('test.avg', 'grp1'), Avg())
sensor.add(metrics.metric_name('test.max', 'grp1'), Max())
sensor.add(metrics.metric_name('test.min', 'grp1'), Min())
sensor.add(metrics.metric_name('test.rate', 'grp1'), Rate(TimeUnit.SECONDS))
sensor.add(metrics.metric_name('test.occurences', 'grp1'),Rate(TimeUnit.SECONDS, Count()))
sensor.add(metrics.metric_name('test.count', 'grp1'), Count())
percentiles = [Percentile(metrics.metric_name('test.median', 'grp1'), 50.0),
Percentile(metrics.metric_name('test.perc99_9', 'grp1'), 99.9)]
sensor.add_compound(Percentiles(100, BucketSizing.CONSTANT, 100, -100,
percentiles=percentiles))
sensor2 = metrics.sensor('test.sensor2')
sensor2.add(metrics.metric_name('s2.total', 'grp1'), Total())
sensor2.record(5.0)
sum_val = 0
count = 10
for i in range(count):
sensor.record(i)
sum_val += i
# prior to any time passing
elapsed_secs = (config.time_window_ms * (config.samples - 1)) / 1000.0
assert abs(count / elapsed_secs -
metrics.metrics.get(metrics.metric_name('test.occurences', 'grp1')).value()) \
< EPS, 'Occurrences(0...%d) = %f' % (count, count / elapsed_secs)
# pretend 2 seconds passed...
sleep_time_seconds = 2.0
time_keeper.sleep(sleep_time_seconds)
elapsed_secs += sleep_time_seconds
assert abs(5.0 - metrics.metrics.get(metrics.metric_name('s2.total', 'grp1')).value()) \
< EPS, 's2 reflects the constant value'
assert abs(4.5 - metrics.metrics.get(metrics.metric_name('test.avg', 'grp1')).value()) \
< EPS, 'Avg(0...9) = 4.5'
assert abs((count - 1) - metrics.metrics.get(metrics.metric_name('test.max', 'grp1')).value()) \
< EPS, 'Max(0...9) = 9'
assert abs(0.0 - metrics.metrics.get(metrics.metric_name('test.min', 'grp1')).value()) \
< EPS, 'Min(0...9) = 0'
assert abs((sum_val / elapsed_secs) - metrics.metrics.get(metrics.metric_name('test.rate', 'grp1')).value()) \
< EPS, 'Rate(0...9) = 1.40625'
assert abs((count / elapsed_secs) - metrics.metrics.get(metrics.metric_name('test.occurences', 'grp1')).value()) \
< EPS, 'Occurrences(0...%d) = %f' % (count, count / elapsed_secs)
assert abs(count - metrics.metrics.get(metrics.metric_name('test.count', 'grp1')).value()) \
< EPS, 'Count(0...9) = 10'
def test_hierarchical_sensors(metrics):
parent1 = metrics.sensor('test.parent1')
parent1.add(metrics.metric_name('test.parent1.count', 'grp1'), Count())
parent2 = metrics.sensor('test.parent2')
parent2.add(metrics.metric_name('test.parent2.count', 'grp1'), Count())
child1 = metrics.sensor('test.child1', parents=[parent1, parent2])
child1.add(metrics.metric_name('test.child1.count', 'grp1'), Count())
child2 = metrics.sensor('test.child2', parents=[parent1])
child2.add(metrics.metric_name('test.child2.count', 'grp1'), Count())
grandchild = metrics.sensor('test.grandchild', parents=[child1])
grandchild.add(metrics.metric_name('test.grandchild.count', 'grp1'), Count())
# increment each sensor one time
parent1.record()
parent2.record()
child1.record()
child2.record()
grandchild.record()
p1 = parent1.metrics[0].value()
p2 = parent2.metrics[0].value()
c1 = child1.metrics[0].value()
c2 = child2.metrics[0].value()
gc = grandchild.metrics[0].value()
# each metric should have a count equal to one + its children's count
assert 1.0 == gc
assert 1.0 + gc == c1
assert 1.0 == c2
assert 1.0 + c1 == p2
assert 1.0 + c1 + c2 == p1
assert [child1, child2] == metrics._children_sensors.get(parent1)
assert [child1] == metrics._children_sensors.get(parent2)
assert metrics._children_sensors.get(grandchild) is None
def test_bad_sensor_hierarchy(metrics):
parent = metrics.sensor('parent')
child1 = metrics.sensor('child1', parents=[parent])
child2 = metrics.sensor('child2', parents=[parent])
with pytest.raises(ValueError):
metrics.sensor('gc', parents=[child1, child2])
def test_remove_sensor(metrics):
size = len(metrics.metrics)
parent1 = metrics.sensor('test.parent1')
parent1.add(metrics.metric_name('test.parent1.count', 'grp1'), Count())
parent2 = metrics.sensor('test.parent2')
parent2.add(metrics.metric_name('test.parent2.count', 'grp1'), Count())
child1 = metrics.sensor('test.child1', parents=[parent1, parent2])
child1.add(metrics.metric_name('test.child1.count', 'grp1'), Count())
child2 = metrics.sensor('test.child2', parents=[parent2])
child2.add(metrics.metric_name('test.child2.count', 'grp1'), Count())
grandchild1 = metrics.sensor('test.gchild2', parents=[child2])
grandchild1.add(metrics.metric_name('test.gchild2.count', 'grp1'), Count())
sensor = metrics.get_sensor('test.parent1')
assert sensor is not None
metrics.remove_sensor('test.parent1')
assert metrics.get_sensor('test.parent1') is None
assert metrics.metrics.get(metrics.metric_name('test.parent1.count', 'grp1')) is None
assert metrics.get_sensor('test.child1') is None
assert metrics._children_sensors.get(sensor) is None
assert metrics.metrics.get(metrics.metric_name('test.child1.count', 'grp1')) is None
sensor = metrics.get_sensor('test.gchild2')
assert sensor is not None
metrics.remove_sensor('test.gchild2')
assert metrics.get_sensor('test.gchild2') is None
assert metrics._children_sensors.get(sensor) is None
assert metrics.metrics.get(metrics.metric_name('test.gchild2.count', 'grp1')) is None
sensor = metrics.get_sensor('test.child2')
assert sensor is not None
metrics.remove_sensor('test.child2')
assert metrics.get_sensor('test.child2') is None
assert metrics._children_sensors.get(sensor) is None
assert metrics.metrics.get(metrics.metric_name('test.child2.count', 'grp1')) is None
sensor = metrics.get_sensor('test.parent2')
assert sensor is not None
metrics.remove_sensor('test.parent2')
assert metrics.get_sensor('test.parent2') is None
assert metrics._children_sensors.get(sensor) is None
assert metrics.metrics.get(metrics.metric_name('test.parent2.count', 'grp1')) is None
assert size == len(metrics.metrics)
def test_remove_inactive_metrics(mocker, time_keeper, metrics):
mocker.patch('time.time', side_effect=time_keeper.time)
s1 = metrics.sensor('test.s1', None, 1)
s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count())
s2 = metrics.sensor('test.s2', None, 3)
s2.add(metrics.metric_name('test.s2.count', 'grp1'), Count())
purger = Metrics.ExpireSensorTask
purger.run(metrics)
assert metrics.get_sensor('test.s1') is not None, \
'Sensor test.s1 must be present'
assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is not None, \
'MetricName test.s1.count must be present'
assert metrics.get_sensor('test.s2') is not None, \
'Sensor test.s2 must be present'
assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \
'MetricName test.s2.count must be present'
time_keeper.sleep(1.001)
purger.run(metrics)
assert metrics.get_sensor('test.s1') is None, \
'Sensor test.s1 should have been purged'
assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is None, \
'MetricName test.s1.count should have been purged'
assert metrics.get_sensor('test.s2') is not None, \
'Sensor test.s2 must be present'
assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \
'MetricName test.s2.count must be present'
# record a value in sensor s2. This should reset the clock for that sensor.
# It should not get purged at the 3 second mark after creation
s2.record()
time_keeper.sleep(2)
purger.run(metrics)
assert metrics.get_sensor('test.s2') is not None, \
'Sensor test.s2 must be present'
assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \
'MetricName test.s2.count must be present'
# After another 1 second sleep, the metric should be purged
time_keeper.sleep(1)
purger.run(metrics)
assert metrics.get_sensor('test.s1') is None, \
'Sensor test.s2 should have been purged'
assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is None, \
'MetricName test.s2.count should have been purged'
# After purging, it should be possible to recreate a metric
s1 = metrics.sensor('test.s1', None, 1)
s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count())
assert metrics.get_sensor('test.s1') is not None, \
'Sensor test.s1 must be present'
assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is not None, \
'MetricName test.s1.count must be present'
def test_remove_metric(metrics):
size = len(metrics.metrics)
metrics.add_metric(metrics.metric_name('test1', 'grp1'), Count())
metrics.add_metric(metrics.metric_name('test2', 'grp1'), Count())
assert metrics.remove_metric(metrics.metric_name('test1', 'grp1')) is not None
assert metrics.metrics.get(metrics.metric_name('test1', 'grp1')) is None
assert metrics.metrics.get(metrics.metric_name('test2', 'grp1')) is not None
assert metrics.remove_metric(metrics.metric_name('test2', 'grp1')) is not None
assert metrics.metrics.get(metrics.metric_name('test2', 'grp1')) is None
assert size == len(metrics.metrics)
def test_event_windowing(mocker, time_keeper):
mocker.patch('time.time', side_effect=time_keeper.time)
count = Count()
config = MetricConfig(event_window=1, samples=2)
count.record(config, 1.0, time_keeper.ms())
count.record(config, 1.0, time_keeper.ms())
assert 2.0 == count.measure(config, time_keeper.ms())
count.record(config, 1.0, time_keeper.ms()) # first event times out
assert 2.0 == count.measure(config, time_keeper.ms())
def test_time_windowing(mocker, time_keeper):
mocker.patch('time.time', side_effect=time_keeper.time)
count = Count()
config = MetricConfig(time_window_ms=1, samples=2)
count.record(config, 1.0, time_keeper.ms())
time_keeper.sleep(.001)
count.record(config, 1.0, time_keeper.ms())
assert 2.0 == count.measure(config, time_keeper.ms())
time_keeper.sleep(.001)
count.record(config, 1.0, time_keeper.ms()) # oldest event times out
assert 2.0 == count.measure(config, time_keeper.ms())
def test_old_data_has_no_effect(mocker, time_keeper):
mocker.patch('time.time', side_effect=time_keeper.time)
max_stat = Max()
min_stat = Min()
avg_stat = Avg()
count_stat = Count()
window_ms = 100
samples = 2
config = MetricConfig(time_window_ms=window_ms, samples=samples)
max_stat.record(config, 50, time_keeper.ms())
min_stat.record(config, 50, time_keeper.ms())
avg_stat.record(config, 50, time_keeper.ms())
count_stat.record(config, 50, time_keeper.ms())
time_keeper.sleep(samples * window_ms / 1000.0)
assert float('-inf') == max_stat.measure(config, time_keeper.ms())
assert float(sys.maxsize) == min_stat.measure(config, time_keeper.ms())
assert 0.0 == avg_stat.measure(config, time_keeper.ms())
assert 0 == count_stat.measure(config, time_keeper.ms())
def test_duplicate_MetricName(metrics):
metrics.sensor('test').add(metrics.metric_name('test', 'grp1'), Avg())
with pytest.raises(ValueError):
metrics.sensor('test2').add(metrics.metric_name('test', 'grp1'), Total())
def test_Quotas(metrics):
sensor = metrics.sensor('test')
sensor.add(metrics.metric_name('test1.total', 'grp1'), Total(),
MetricConfig(quota=Quota.upper_bound(5.0)))
sensor.add(metrics.metric_name('test2.total', 'grp1'), Total(),
MetricConfig(quota=Quota.lower_bound(0.0)))
sensor.record(5.0)
with pytest.raises(QuotaViolationError):
sensor.record(1.0)
assert abs(6.0 - metrics.metrics.get(metrics.metric_name('test1.total', 'grp1')).value()) \
< EPS
sensor.record(-6.0)
with pytest.raises(QuotaViolationError):
sensor.record(-1.0)
def test_Quotas_equality():
quota1 = Quota.upper_bound(10.5)
quota2 = Quota.lower_bound(10.5)
assert quota1 != quota2, 'Quota with different upper values should not be equal'
quota3 = Quota.lower_bound(10.5)
assert quota2 == quota3, 'Quota with same upper and bound values should be equal'
def test_Percentiles(metrics):
buckets = 100
_percentiles = [
Percentile(metrics.metric_name('test.p25', 'grp1'), 25),
Percentile(metrics.metric_name('test.p50', 'grp1'), 50),
Percentile(metrics.metric_name('test.p75', 'grp1'), 75),
]
percs = Percentiles(4 * buckets, BucketSizing.CONSTANT, 100.0, 0.0,
percentiles=_percentiles)
config = MetricConfig(event_window=50, samples=2)
sensor = metrics.sensor('test', config)
sensor.add_compound(percs)
p25 = metrics.metrics.get(metrics.metric_name('test.p25', 'grp1'))
p50 = metrics.metrics.get(metrics.metric_name('test.p50', 'grp1'))
p75 = metrics.metrics.get(metrics.metric_name('test.p75', 'grp1'))
# record two windows worth of sequential values
for i in range(buckets):
sensor.record(i)
assert abs(p25.value() - 25) < 1.0
assert abs(p50.value() - 50) < 1.0
assert abs(p75.value() - 75) < 1.0
for i in range(buckets):
sensor.record(0.0)
assert p25.value() < 1.0
assert p50.value() < 1.0
assert p75.value() < 1.0
def test_rate_windowing(mocker, time_keeper, metrics):
mocker.patch('time.time', side_effect=time_keeper.time)
# Use the default time window. Set 3 samples
config = MetricConfig(samples=3)
sensor = metrics.sensor('test.sensor', config)
sensor.add(metrics.metric_name('test.rate', 'grp1'), Rate(TimeUnit.SECONDS))
sum_val = 0
count = config.samples - 1
# Advance 1 window after every record
for i in range(count):
sensor.record(100)
sum_val += 100
time_keeper.sleep(config.time_window_ms / 1000.0)
# Sleep for half the window.
time_keeper.sleep(config.time_window_ms / 2.0 / 1000.0)
# prior to any time passing
elapsed_secs = (config.time_window_ms * (config.samples - 1) + config.time_window_ms / 2.0) / 1000.0
kafka_metric = metrics.metrics.get(metrics.metric_name('test.rate', 'grp1'))
assert abs((sum_val / elapsed_secs) - kafka_metric.value()) < EPS, \
'Rate(0...2) = 2.666'
assert abs(elapsed_secs - (kafka_metric.measurable.window_size(config, time.time() * 1000) / 1000.0)) \
< EPS, 'Elapsed Time = 75 seconds'
def test_reporter(metrics):
reporter = DictReporter()
foo_reporter = DictReporter(prefix='foo')
metrics.add_reporter(reporter)
metrics.add_reporter(foo_reporter)
sensor = metrics.sensor('kafka.requests')
sensor.add(metrics.metric_name('pack.bean1.avg', 'grp1'), Avg())
sensor.add(metrics.metric_name('pack.bean2.total', 'grp2'), Total())
sensor2 = metrics.sensor('kafka.blah')
sensor2.add(metrics.metric_name('pack.bean1.some', 'grp1'), Total())
sensor2.add(metrics.metric_name('pack.bean2.some', 'grp1',
tags={'a': 42, 'b': 'bar'}), Total())
# kafka-metrics-count > count is the total number of metrics and automatic
expected = {
'kafka-metrics-count': {'count': 5.0},
'grp2': {'pack.bean2.total': 0.0},
'grp1': {'pack.bean1.avg': 0.0, 'pack.bean1.some': 0.0},
'grp1.a=42,b=bar': {'pack.bean2.some': 0.0},
}
assert expected == reporter.snapshot()
for key in list(expected.keys()):
metrics = expected.pop(key)
expected['foo.%s' % key] = metrics
assert expected == foo_reporter.snapshot()
class ConstantMeasurable(AbstractMeasurable):
_value = 0.0
def measure(self, config, now):
return self._value
class TimeKeeper(object):
"""
A clock that you can manually advance by calling sleep
"""
def __init__(self, auto_tick_ms=0):
self._millis = time.time() * 1000
self._auto_tick_ms = auto_tick_ms
def time(self):
return self.ms() / 1000.0
def ms(self):
self.sleep(self._auto_tick_ms)
return self._millis
def sleep(self, seconds):
self._millis += (seconds * 1000)