trying to use locks a little less. still not ideal.
This commit is contained in:
@@ -12,7 +12,7 @@ class DictReporter(AbstractMetricsReporter):
|
||||
Store all metrics in a two level dictionary of category > name > metric.
|
||||
"""
|
||||
def __init__(self, prefix=''):
|
||||
self._lock = threading.RLock()
|
||||
self._lock = threading.Lock()
|
||||
self._prefix = prefix if prefix else '' # never allow None
|
||||
self._store = {}
|
||||
|
||||
@@ -28,14 +28,13 @@ class DictReporter(AbstractMetricsReporter):
|
||||
}
|
||||
"""
|
||||
return dict((category, dict((name, metric.value())
|
||||
for name, metric in metrics.items()))
|
||||
for name, metric in list(metrics.items())))
|
||||
for category, metrics in
|
||||
self._store.items())
|
||||
list(self._store.items()))
|
||||
|
||||
def init(self, metrics):
|
||||
with self._lock:
|
||||
for metric in metrics:
|
||||
self.metric_change(metric)
|
||||
for metric in metrics:
|
||||
self.metric_change(metric)
|
||||
|
||||
def metric_change(self, metric):
|
||||
with self._lock:
|
||||
|
||||
@@ -2,13 +2,13 @@ import time
|
||||
|
||||
|
||||
class KafkaMetric(object):
|
||||
def __init__(self, lock, metric_name, measurable, config):
|
||||
# NOTE java constructor takes a lock instance
|
||||
def __init__(self, metric_name, measurable, config):
|
||||
if not metric_name:
|
||||
raise ValueError('metric_name must be non-empty')
|
||||
if not measurable:
|
||||
raise ValueError('measurable must be non-empty')
|
||||
self._metric_name = metric_name
|
||||
self._lock = lock
|
||||
self._measurable = measurable
|
||||
self._config = config
|
||||
|
||||
@@ -26,11 +26,9 @@ class KafkaMetric(object):
|
||||
|
||||
@config.setter
|
||||
def config(self, config):
|
||||
with self._lock:
|
||||
self._config = config
|
||||
self._config = config
|
||||
|
||||
def value(self, time_ms=None):
|
||||
if time_ms is None:
|
||||
# with (self._lock): This doesn't seem necessary?
|
||||
time_ms = time.time() * 1000
|
||||
return self.measurable.measure(self.config, time_ms)
|
||||
|
||||
@@ -134,6 +134,10 @@ class Metrics(object):
|
||||
Returns:
|
||||
Sensor: The sensor that is created
|
||||
"""
|
||||
sensor = self.get_sensor(name)
|
||||
if sensor:
|
||||
return sensor
|
||||
|
||||
with self._lock:
|
||||
sensor = self.get_sensor(name)
|
||||
if not sensor:
|
||||
@@ -185,10 +189,9 @@ class Metrics(object):
|
||||
config (MetricConfig, optional): The configuration to use when
|
||||
measuring this measurable
|
||||
"""
|
||||
with self._lock:
|
||||
metric = KafkaMetric(threading.Lock(), metric_name, measurable,
|
||||
config or self.config)
|
||||
self.register_metric(metric)
|
||||
# NOTE there was a lock here, but i don't think it's needed
|
||||
metric = KafkaMetric(metric_name, measurable, config or self.config)
|
||||
self.register_metric(metric)
|
||||
|
||||
def remove_metric(self, metric_name):
|
||||
"""
|
||||
|
||||
@@ -103,8 +103,8 @@ class Sensor(object):
|
||||
raise ValueError('compound stat must be non-empty')
|
||||
self._stats.append(compound_stat)
|
||||
for named_measurable in compound_stat.stats():
|
||||
metric = KafkaMetric(self._lock, named_measurable.name,
|
||||
named_measurable.stat, config or self._config)
|
||||
metric = KafkaMetric(named_measurable.name, named_measurable.stat,
|
||||
config or self._config)
|
||||
self._registry.register_metric(metric)
|
||||
self._metrics.append(metric)
|
||||
|
||||
@@ -119,8 +119,7 @@ class Sensor(object):
|
||||
If None use the sensor default configuration.
|
||||
"""
|
||||
with self._lock:
|
||||
metric = KafkaMetric(threading.Lock(), metric_name, stat,
|
||||
config or self._config)
|
||||
metric = KafkaMetric(metric_name, stat, config or self._config)
|
||||
self._registry.register_metric(metric)
|
||||
self._metrics.append(metric)
|
||||
self._stats.append(stat)
|
||||
|
||||
Reference in New Issue
Block a user