stats: API for native labeled metrics

Introduce a LabeledStatsdClient API; no callers yet.

Include three config options:
  - statsd_label_mode, which specifies which label format to use
  - statsd_emit_legacy, which dictates whether to emit old-style
    metrics dotted metrics
  - statsd_user_label_<name> = <value>, which supports user defined
    labels in restricted ASCII characters

Co-Authored-By: yanxiao@nvidia.com
Co-Authored-By: alistairncoles@gmail.com

Change-Id: I115ffb1dc601652a979895d7944e011b951a91c1
This commit is contained in:
Tim Burke
2023-09-29 17:21:08 -07:00
committed by Shreeya Deshpande
parent 05143a99f8
commit 7e5235894b
7 changed files with 986 additions and 63 deletions

View File

@@ -817,6 +817,10 @@ Reporting Metrics to StatsD
.. highlight:: cfg
.. note::
The legacy statsd metrics described in this section are being supplemented
with :doc:`metrics/labels`.
If you have a StatsD_ server running, Swift may be configured to send it
real-time operational metrics. To enable this, set the following
configuration entries (see the sample configuration files)::

View File

@@ -0,0 +1,69 @@
:orphan:
Labeled Metrics
===============
.. note::
Labeled metrics are still an experimental feature. This document contains
forward looking statements that anticipate future development of labeled
metrics support. In particular, metric names and labels may be subject to
change as we explore the space.
.. warning::
Enabling labeled metrics will likely cause a dramatic increase in the number
of distinct metrics time series. Ensure your metrics pipeline is prepared.
Recent versions of Swift emit StatsD metrics with explicit application-defined
labels, rather than relying on consumers knowing how to unpack the legacy label
names. A variety of StatsD extension formats are available, many of which are
parsed by `statsd_exporter <https://github.com/prometheus/statsd_exporter/>`__:
- ``librato``
- ``influxdb``
- ``dogstatsd``
- ``signalfx``
- ``graphite``
See the ``proxy-server.conf-sample`` file for more information on configuring
labeled metrics.
Labeled metrics are emitted in addition to legacy StatsD metrics. However,
legacy StatsD metrics can be disabled by setting the ``statsd_emit_legacy``
option to ``False``. This is not recommended until more legacy metrics have
been supplemented with equivalent labeled metrics.
As various Swift middlewares, services and daemons are upgraded to emit labeled
metrics, they will be documented in the relevant section of the :doc:`all`
page.
Common Labels
-------------
Each labeled metric may have its own unique labels, but many labeled metrics
will use some or all of a common set of labels. The common labels are
documented here for information purposes, but the authoritative set of labels
for each metric can be found in the sections of the :doc:`all` page.
.. table::
:align: left
================ ==========================================================
Label Name Value
---------------- ----------------------------------------------------------
``type`` The type of resource associated with the metric
i.e. ``account``, ``container`` or ``object``.
``account`` The quoted account name associated with the metric.
``container`` The quoted container name associated with the metric.
``policy`` The storage policy index associated with the metric.
``status`` The status int of an HTTP response associated with the
metric.
``method`` The method of an HTTP request associated with the metric.
================ ==========================================================
.. note::
Note that metrics will *not* have labels that would likely have a very high
cardinality of values, such as object names, as this is expected to be
problematic for metrics collectors. Nevertheless, some operators may still
need to drop labels such as ``container`` in order to keep metric
cardinalities reasonable.

View File

@@ -69,6 +69,51 @@ bind_port = 8080
# log_statsd_sample_rate_factor = 1.0
# log_statsd_metric_prefix =
#
# Statsd metrics may include labeling information in a variety of formats.
# Available options:
# disabled, dogstatsd, graphite, influxdb, librato, signalfx.
# Defaults to disabled; enable statsd_label_mode by setting another option.
# See also: https://github.com/prometheus/statsd_exporter#tagging-extensions.
# Note that enabling statsd_label_mode will likely increase the number of time
# series stored, as more labeled metrics may be exposed than may have been
# previously extracted from the dotted non-labeled legacy metric format.
# statsd_label_mode = disabled
#
# Historically, statsd metrics were emitted with implied labels as part of
# metric name in a dotted "legacy" format. Once swift is fully instrumented
# with labeled metrics, and you have statsd_label_mode enabled, you may want to
# turn off legacy metrics; to do that set this option to False. Defaults to
# True.
# statsd_emit_legacy = True
#
# Statsd metrics emitted with labels also support user defined labels
# configured by options. The format for each option is:
# statsd_user_label_<name> = <value>
# where <name> and <value> are restricted to a subset of non-whitespace ASCII
# characters, including letters (upper and lower), numbers and underscores.
# <value> may also contain the period character (.). Each option will add a
# label with name user_<name> and value <value> to labeled metrics.
# User defined labels may be configured in this [DEFAULT] section, in which
# case they will be included with every labeled metric, or they may be
# configured in individual [filter:<middleware>] sections, in which case they
# will only be included with labeled metrics emitted by that <middleware>.
# For example, a proxy-server configuration could use the following to
# delineate labeled metrics emitted by different instances of proxy-logging
# middleware in the pipeline:
# [filter:subrequest-logging]
# use = egg:swift#proxy_logging
# statsd_user_label_reqctx = subrequest
# which adds a label with name 'user_reqctx' and value 'subrequest' to every
# labeled metrics emitted by this proxy-logging instance. This would achieve
# similar effect as the following proxy-server configuration for legacy
# non-labeled metrics:
# [filter:subrequest-logging]
# use = egg:swift#proxy_logging
# access_log_statsd_metric_prefix = subrequest
# Note that the legacy metrics option 'access_log_statsd_metric_prefix' does
# not apply to labeled metrics.
# By default there are no user defined labels.
#
# List of origin hosts that are allowed for CORS requests in addition to what
# the container has set.
# Use a comma separated list of full URL (http://foo.bar:1234,https://foo.bar)

View File

@@ -17,11 +17,91 @@
import time
import warnings
import re
from contextlib import closing
from random import random
from eventlet.green import socket
from swift.common.utils.config import config_true_value
STATSD_CONF_USER_LABEL_PREFIX = 'statsd_user_label_'
STATSD_USER_LABEL_NAMESPACE = 'user_'
USER_LABEL_PATTERN = re.compile(r"[^0-9a-zA-Z_]")
USER_VALUE_PATTERN = re.compile(r"[^0-9a-zA-Z_.]")
def _build_line_parts(metric, value, metric_type, sample_rate):
line = '%s:%s|%s' % (metric, value, metric_type)
if sample_rate < 1:
line += '|@%s' % (sample_rate,)
return line
class LabeledFormats:
disabled = None
@staticmethod
def librato(metric, value, metric_type, sample_rate, labels):
# https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags
if labels:
metric += '#' + ','.join('%s=%s' % (k, v) for k, v in labels)
line = _build_line_parts(metric, value, metric_type, sample_rate)
return line
@staticmethod
def influxdb(metric, value, metric_type, sample_rate, labels):
# https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd
if labels:
metric += ''.join(',%s=%s' % (k, v) for k, v in labels)
line = _build_line_parts(metric, value, metric_type, sample_rate)
return line
@staticmethod
def signalfx(metric, value, metric_type, sample_rate, labels):
# https://web.archive.org/web/20211123040355/https://docs.signalfx.com/en/latest/integrations/agent/monitors/collectd-statsd.html#adding-dimensions-to-statsd-metrics
# https://docs.splunk.com/Observability/gdi/statsd/statsd.html#adding-dimensions-to-statsd-metrics
if labels:
metric += '[%s]' % ','.join('%s=%s' % (k, v) for k, v in labels)
line = _build_line_parts(metric, value, metric_type, sample_rate)
return line
@staticmethod
def graphite(metric, value, metric_type, sample_rate, labels):
# https://graphite.readthedocs.io/en/latest/tags.html#carbon
if labels:
metric += ''.join(';%s=%s' % (k, v) for k, v in labels)
line = _build_line_parts(metric, value, metric_type, sample_rate)
return line
@staticmethod
def dogstatsd(metric, value, metric_type, sample_rate, labels):
# https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics
line = _build_line_parts(metric, value, metric_type, sample_rate)
if labels:
line += '|#' + ','.join('%s:%s' % (k, v) for k, v in labels)
return line
def _get_labeled_statsd_formatter(label_mode):
"""
Returns a label formatting function for the given ``label_mode``.
:param label_mode: A label mode.
:raises ValueError: if ``label_mode`` is not supported by ``LabelFormats``.
:returns: a label formatting function.
"""
try:
return getattr(LabeledFormats, label_mode)
except AttributeError:
label_modes = [
f for f in LabeledFormats.__dict__
if not f.startswith('__')]
raise ValueError(
'unknown statsd_label_mode %r; '
'expected one of %r' % (label_mode, label_modes))
def get_statsd_client(conf=None, tail_prefix='', logger=None):
"""
@@ -34,12 +114,12 @@ def get_statsd_client(conf=None, tail_prefix='', logger=None):
log_statsd_default_sample_rate = 1.0
log_statsd_sample_rate_factor = 1.0
log_statsd_metric_prefix = (empty-string)
statsd_emit_legacy = true
:param conf: Configuration dict to read settings from
:param tail_prefix: tail prefix to pass to statsd client
:param logger: stdlib logger instance used by statsd client for logging
:return: an instance of ``StatsdClient``
"""
conf = conf or {}
@@ -51,23 +131,98 @@ def get_statsd_client(conf=None, tail_prefix='', logger=None):
sample_rate_factor = float(
conf.get('log_statsd_sample_rate_factor', 1))
return StatsdClient(host, port, base_prefix=base_prefix,
tail_prefix=tail_prefix,
default_sample_rate=default_sample_rate,
sample_rate_factor=sample_rate_factor, logger=logger)
emit_legacy = config_true_value(conf.get(
'statsd_emit_legacy', 'true'))
return StatsdClient(
host, port,
base_prefix=base_prefix,
tail_prefix=tail_prefix,
default_sample_rate=default_sample_rate,
sample_rate_factor=sample_rate_factor,
emit_legacy=emit_legacy,
logger=logger)
class StatsdClient(object):
def __init__(self, host, port, base_prefix='', tail_prefix='',
default_sample_rate=1, sample_rate_factor=1, logger=None):
def get_labeled_statsd_client(conf=None, logger=None):
"""
Get an instance of LabeledStatsdClient using config settings.
**config and defaults**::
log_statsd_host = (disabled)
log_statsd_port = 8125
log_statsd_default_sample_rate = 1.0
log_statsd_sample_rate_factor = 1.0
statsd_label_mode = disabled
:param conf: Configuration dict to read settings from
:param logger: stdlib logger instance used by statsd client for logging
:return: an instance of ``LabeledStatsdClient``
"""
conf = conf or {}
host = conf.get('log_statsd_host')
port = int(conf.get('log_statsd_port', 8125))
default_sample_rate = float(
conf.get('log_statsd_default_sample_rate', 1))
sample_rate_factor = float(
conf.get('log_statsd_sample_rate_factor', 1))
label_mode = conf.get(
'statsd_label_mode', 'disabled').lower()
default_labels = {}
for k, v in conf.items():
if not k.startswith(STATSD_CONF_USER_LABEL_PREFIX):
continue
conf_label = k[len(STATSD_CONF_USER_LABEL_PREFIX):]
result = USER_LABEL_PATTERN.search(conf_label)
if result is not None:
raise ValueError(
'invalid character in statsd user label '
'configuration {0!r}: {1!r}'.format(
k, result.group(0)))
result = USER_VALUE_PATTERN.search(v)
if result is not None:
raise ValueError(
'invalid character in configuration {0!r} '
'value {1!r}: {2!r}'.format(
k, v, result.group(0)))
conf_label = STATSD_USER_LABEL_NAMESPACE + conf_label
default_labels[conf_label] = v
return LabeledStatsdClient(
host, port,
default_sample_rate=default_sample_rate,
sample_rate_factor=sample_rate_factor,
label_mode=label_mode,
default_labels=default_labels,
logger=logger)
class AbstractStatsdClient:
"""
Base class to facilitate sending metrics to a socket. Sub-classes are
responsible for formatting metrics lines.
:param host: Statsd host name. If ``None`` then metrics are not sent.
:param port: Statsd host port.
:param default_sample_rate: The default rate at which metrics should be
sampled if no sample rate is otherwise specified. Should be a float
value between 0 and 1.
:param sample_rate_factor: A multiplier to apply to the rate at which
metrics are sampled. Should be a float value between 0 and 1.
:param logger: A stdlib logger instance.
"""
def __init__(self, host, port, default_sample_rate=1,
sample_rate_factor=1, logger=None):
self._host = host
self._port = port
self._base_prefix = base_prefix
self._default_sample_rate = default_sample_rate
self._sample_rate_factor = sample_rate_factor
self.random = random
self.logger = logger
self._set_prefix(tail_prefix)
self._sock_family = self._target = None
if self._host:
@@ -109,6 +264,111 @@ class StatsdClient(object):
else:
self._target = (host, port)
def _is_emitted(self, sample_rate):
"""
Adjust the given ``sample_rate`` by the configured
``sample_rate_factor`` and, based on the adjusted sample rate,
determine if a stat should be emitted on this occasion.
Sub-classes should call this method before sending a metric line with
``_send_line``.
:param sample_rate: The sample_rate given in the call to emit a stat.
If ``None`` then this will default to the configured
``default_sample_rate``.
:returns: a tuple ``(<boolean>, <adjusted_sample_rate>)``. The boolean
is ``True`` if a stat should be emitted on this occasion, ``False``
otherwise.
"""
if not self._host:
# StatsD not configured
return False, None
if sample_rate is None:
sample_rate = self._default_sample_rate
adjusted_sample_rate = sample_rate * self._sample_rate_factor
if adjusted_sample_rate < 1 and self.random() >= adjusted_sample_rate:
return False, None
return True, adjusted_sample_rate
def _send_line(self, line):
"""
Send a ``line`` of metrics to socket.
Sub-classes should call ``_is_emitted`` before calling this method.
:param line: The string to be sent to the socket. If ``None`` then
nothing is sent.
"""
if line is None:
return
# Ideally, we'd cache a sending socket in self, but that
# results in a socket getting shared by multiple green threads.
with closing(self._open_socket()) as sock:
try:
return sock.sendto(line.encode('utf-8'), self._target)
except IOError as err:
if self.logger:
self.logger.warning(
'Error sending UDP message to %(target)r: %(err)s',
{'target': self._target, 'err': err})
def _open_socket(self):
return socket.socket(self._sock_family, socket.SOCK_DGRAM)
class StatsdClient(AbstractStatsdClient):
"""
A legacy statsd client. This client does not support labeled metrics.
A prefix may be specified using the ``base_prefix`` and ``tail_prefix``
arguments. The prefix is added to the name of every metric such that the
emitted metric name has the form:
[<base_prefix>.][tail_prefix.]<metric name>
:param host: Statsd host name. If ``None`` then metrics are not sent.
:param port: Statsd host port.
:param base_prefix: (optional) A string that will form the first part of a
prefix added to each metric name. The prefix is separated from the
metric name by a '.' character.
:param tail_prefix: (optional) A string that will form the second part of a
prefix added to each metric name. The prefix is separated from the
metric name by a '.' character.
:param default_sample_rate: The default rate at which metrics should be
sampled if no sample rate is otherwise specified. Should be a float
value between 0 and 1.
:param sample_rate_factor: A multiplier to apply to the rate at which
metrics are sampled. Should be a float value between 0 and 1.
:param emit_legacy: if ``True`` then the client will emit metrics; if
``False`` then the client will emit no metrics.
:param logger: A stdlib logger instance.
"""
def __init__(self, host, port,
base_prefix='', tail_prefix='',
default_sample_rate=1, sample_rate_factor=1,
emit_legacy=True,
logger=None):
super().__init__(
host, port, default_sample_rate, sample_rate_factor, logger)
self._base_prefix = base_prefix
self.emit_legacy = emit_legacy
self._set_prefix(tail_prefix)
def _send(self, metric, value, metric_type, sample_rate):
is_emitted, adjusted_sample_rate = self._is_emitted(sample_rate)
if self.emit_legacy and is_emitted:
metric = self._prefix + metric
line = _build_line_parts(
metric, value, metric_type, adjusted_sample_rate)
return self._send_line(line)
def _set_prefix(self, tail_prefix):
"""
Modifies the prefix that is added to metric names. The resulting prefix
@@ -148,38 +408,8 @@ class StatsdClient(object):
)
self._set_prefix(tail_prefix)
def _send(self, m_name, m_value, m_type, sample_rate):
if not self._host:
# StatsD not configured
return
if sample_rate is None:
sample_rate = self._default_sample_rate
sample_rate = sample_rate * self._sample_rate_factor
parts = ['%s%s:%s' % (self._prefix, m_name, m_value), m_type]
if sample_rate < 1:
if self.random() < sample_rate:
parts.append('@%s' % (sample_rate,))
else:
return
parts = [part.encode('utf-8') for part in parts]
# Ideally, we'd cache a sending socket in self, but that
# results in a socket getting shared by multiple green threads.
with closing(self._open_socket()) as sock:
try:
return sock.sendto(b'|'.join(parts), self._target)
except IOError as err:
if self.logger:
self.logger.warning(
'Error sending UDP message to %(target)r: %(err)s',
{'target': self._target, 'err': err})
def _open_socket(self):
return socket.socket(self._sock_family, socket.SOCK_DGRAM)
def update_stats(self, m_name, m_value, sample_rate=None):
return self._send(m_name, m_value, 'c', sample_rate)
def update_stats(self, metric, value, sample_rate=None):
return self._send(metric, value, 'c', sample_rate)
def increment(self, metric, sample_rate=None):
return self.update_stats(metric, 1, sample_rate)
@@ -195,11 +425,103 @@ class StatsdClient(object):
return self._timing(metric, timing_ms, sample_rate)
def timing_since(self, metric, orig_time, sample_rate=None):
return self._timing(metric, (time.time() - orig_time) * 1000,
sample_rate)
return self._timing(
metric, (time.time() - orig_time) * 1000, sample_rate)
def transfer_rate(self, metric, elapsed_time, byte_xfer, sample_rate=None):
if byte_xfer:
return self.timing(metric,
elapsed_time * 1000 / byte_xfer * 1000,
sample_rate)
return self.timing(
metric, elapsed_time * 1000 / byte_xfer * 1000, sample_rate)
class LabeledStatsdClient(AbstractStatsdClient):
"""
A statsd client that supports annotating metrics with labels.
Labeled metrics can be emitted in the style of Graphite, Librato, InfluxDB,
DogStatsD, or SignalFX by specifying the corresponding ``label_mode`` when
constructing a client. If ``label_mode`` is ``disabled`` then no metrics
are emitted by the client.
Label keys should contain only ASCII letters ('a-z', 'A-Z'), digits
('0-9') and the underscore character ('_'). Label values may also contain
the period ('.') character.
Callers should avoid using labels that have a high cardinality of values
since this may result in an unreasonable number of distinct time series for
collectors to maintain. For example, labels should NOT be used for object
names or transaction ids.
:param host: Statsd host name. If ``None`` then metrics are not sent.
:param port: Statsd host port.
:param default_sample_rate: The default rate at which metrics should be
sampled if no sample rate is otherwise specified. Should be a float
value between 0 and 1.
:param sample_rate_factor: A multiplier to apply to the rate at which
metrics are sampled. Should be a float value between 0 and 1.
:param label_mode: one of 'graphite', 'dogstatsd', 'signalfx', 'librato',
'influxdb' or 'disabled'.
:param default_labels: a dictionary of labels that will be added to every
metric emitted by the client.
:param logger: A stdlib logger instance.
"""
def __init__(self, host, port,
default_sample_rate=1, sample_rate_factor=1,
label_mode='disabled', default_labels=None,
logger=None):
super().__init__(
host, port, default_sample_rate, sample_rate_factor, logger)
self.default_labels = default_labels or {}
self.label_formatter = _get_labeled_statsd_formatter(label_mode)
if self.logger:
self.logger.debug('Labeled statsd mode: %s (%s)',
label_mode, self.logger.name)
def _send(self, metric, value, metric_type, sample_rate, labels=None):
if not self.label_formatter:
return
is_emitted, adjusted_sample_rate = self._is_emitted(sample_rate)
if is_emitted:
return self._send_line(self._build_line(
metric, value, metric_type, adjusted_sample_rate, labels))
def _build_line(self, metric, value, metric_type, sample_rate, labels):
all_labels = dict(self.default_labels)
if labels:
all_labels.update(labels)
return self.label_formatter(
metric,
value,
metric_type,
sample_rate,
sorted(all_labels.items()))
def update_stats(self, metric, value, labels=None, sample_rate=None):
return self._send(metric, value, 'c', sample_rate, labels=labels)
def increment(self, metric, labels=None, sample_rate=None):
return self.update_stats(metric, 1, labels, sample_rate)
def decrement(self, metric, labels=None, sample_rate=None):
return self.update_stats(metric, -1, labels, sample_rate)
def _timing(self, metric, timing_ms, labels, sample_rate):
# This method was added to disagregate timing metrics when testing
return self._send(metric, round(timing_ms, 4), 'ms',
sample_rate, labels=labels)
def timing(self, metric, timing_ms, labels=None, sample_rate=None):
return self._timing(metric, timing_ms, labels, sample_rate)
def timing_since(self, metric, orig_time, labels=None, sample_rate=None):
return self._timing(
metric, (time.time() - orig_time) * 1000,
labels, sample_rate)
def transfer_rate(self, metric, elapsed_time, byte_xfer,
labels=None, sample_rate=None):
if byte_xfer:
return self.timing(
metric, elapsed_time * 1000 / byte_xfer * 1000,
labels, sample_rate)

View File

@@ -155,7 +155,7 @@ class FakeLogger(logging.Logger, CaptureLog):
def __init__(self, *args, **kwargs):
self._clear()
self.name = 'swift.unit.fake_logger'
self.name = kwargs.get('name') or 'swift.unit.fake_logger'
self.level = logging.NOTSET
if 'facility' in kwargs:
self.facility = kwargs['facility']
@@ -270,9 +270,10 @@ class DebugLogAdapter(utils.logs.SwiftLogAdapter):
return getattr(self.__dict__['logger'], name)
def debug_logger(name='test'):
def debug_logger(name='test', log_route=None):
"""get a named adapted debug logger"""
adapted_logger = DebugLogAdapter(DebugLogger(), name)
log_route = log_route or name
adapted_logger = DebugLogAdapter(DebugLogger(name=log_route), name)
utils._patch_statsd_methods(adapted_logger, adapted_logger.logger)
return adapted_logger

View File

@@ -27,7 +27,8 @@ from queue import Queue, Empty
from swift.common import statsd_client
from swift.common.statsd_client import StatsdClient, get_statsd_client
from swift.common.statsd_client import StatsdClient, get_statsd_client, \
LabeledFormats
from test.debug_logger import debug_logger
@@ -76,6 +77,9 @@ class BaseTestStatsdClient(unittest.TestCase):
class TestStatsdClient(BaseTestStatsdClient):
"""
Tests here construct a StatsdClient directly.
"""
def test_init_host(self):
client = StatsdClient('myhost', 1234)
self.assertEqual([('myhost', 1234)], self.getaddrinfo_calls)
@@ -130,7 +134,10 @@ class TestStatsdClient(BaseTestStatsdClient):
self.assertEqual('some-name.more-specific.', client._prefix)
class TestModuleFunctions(BaseTestStatsdClient):
class TestGetStatsdClientConfParsing(BaseTestStatsdClient):
"""
Tests here use get_statsd_client to make a StatsdClient.
"""
def test_get_statsd_client_defaults(self):
# no options configured
client = statsd_client.get_statsd_client({})
@@ -155,6 +162,7 @@ class TestModuleFunctions(BaseTestStatsdClient):
'log_statsd_default_sample_rate': '3.3',
'log_statsd_sample_rate_factor': '4.4',
'log_junk': 'ignored',
'statsd_label_mode': 'dogstatsd', # ignored
}
client = statsd_client.get_statsd_client(
conf, tail_prefix='milkshake', logger=self.logger)
@@ -169,6 +177,198 @@ class TestModuleFunctions(BaseTestStatsdClient):
warn_lines = self.logger.get_lines_for_level('warning')
self.assertEqual([], warn_lines)
def test_emit_legacy(self):
conf = {
'log_statsd_host': 'myhost',
'log_statsd_port': '1234',
}
client = statsd_client.get_statsd_client(conf)
with mock.patch.object(client, '_open_socket') as mock_open:
client.increment('tunafish')
self.assertEqual(mock_open.mock_calls, [
mock.call(),
mock.call().sendto(b'tunafish:1|c', ('myhost', 1234)),
mock.call().close(),
])
conf = {
'log_statsd_host': 'myhost',
'log_statsd_port': '1234',
'statsd_emit_legacy': 'False',
}
client = statsd_client.get_statsd_client(conf)
with mock.patch.object(client, '_open_socket') as mock_open:
client.increment('tunafish')
self.assertEqual(mock_open.mock_calls, [])
class TestGetLabeledStatsdClientConfParsing(BaseTestStatsdClient):
"""
Tests here use get_labeled_statsd_client to make a LabeledStatsdClient.
"""
def test_conf_defaults(self):
# no options configured
client = statsd_client.get_labeled_statsd_client({})
self.assertIsInstance(client, statsd_client.LabeledStatsdClient)
self.assertIsNone(client._host)
self.assertEqual(8125, client._port)
self.assertEqual(1.0, client._default_sample_rate)
self.assertEqual(1.0, client._sample_rate_factor)
self.assertIsNone(client.logger)
with mock.patch.object(client, '_open_socket') as mock_open:
client.increment('tunafish', {})
self.assertFalse(mock_open.mock_calls)
def test_conf_non_defaults(self):
# legacy options...
conf = {
'log_statsd_host': 'example.com',
'log_statsd_port': '6789',
'log_statsd_default_sample_rate': '3.3',
'log_statsd_sample_rate_factor': '4.4',
'log_junk': 'ignored',
'statsd_emit_legacy': 'False', # ignored
}
client = statsd_client.get_labeled_statsd_client(
conf, logger=self.logger)
self.assertIsInstance(client, statsd_client.LabeledStatsdClient)
self.assertEqual('example.com', client._host)
self.assertEqual(6789, client._port)
self.assertEqual(3.3, client._default_sample_rate)
self.assertEqual(4.4, client._sample_rate_factor)
self.assertEqual(self.logger, client.logger)
warn_lines = self.logger.get_lines_for_level('warning')
self.assertEqual([], warn_lines)
def test_invalid_label_mode(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': '1234',
'statsd_label_mode': 'invalid',
}
with self.assertRaises(ValueError) as cm:
statsd_client.get_labeled_statsd_client(conf, self.logger)
self.assertIn("unknown statsd_label_mode 'invalid'", str(cm.exception))
def test_valid_label_mode(self):
conf = {'statsd_label_mode': 'dogstatsd'}
logger = debug_logger(log_route='my-log-route')
client = statsd_client.get_labeled_statsd_client(conf, logger)
self.assertEqual(LabeledFormats.dogstatsd, client.label_formatter)
log_lines = logger.get_lines_for_level('debug')
self.assertEqual(1, len(log_lines))
self.assertEqual(
'Labeled statsd mode: dogstatsd (my-log-route)', log_lines[0])
def test_disabled_by_default(self):
conf = {}
logger = debug_logger(log_route='my-log-route')
client = statsd_client.get_labeled_statsd_client(conf, logger)
self.assertIsNone(client.label_formatter)
log_lines = logger.get_lines_for_level('debug')
self.assertEqual(1, len(log_lines))
self.assertEqual(
'Labeled statsd mode: disabled (my-log-route)', log_lines[0])
def test_label_values_to_str(self):
# verify that simple non-str types can be passed as label values
conf = {
'log_statsd_host': 'myhost1',
'log_statsd_port': 1235,
'statsd_label_mode': 'librato',
}
client = statsd_client.get_labeled_statsd_client(conf)
labels = {'bool': True, 'number': 42.1, 'null': None}
with mock.patch.object(client, '_send_line') as mocked:
client.update_stats('metric', '10', labels=labels)
self.assertEqual(
[mock.call('metric#bool=True,null=None,number=42.1:10|c')],
mocked.call_args_list)
def test_user_label(self):
conf = {
'log_statsd_host': 'myhost1',
'log_statsd_port': 1235,
'statsd_label_mode': 'librato',
'statsd_user_label_foo': 'foo.bar.com',
}
client = statsd_client.get_labeled_statsd_client(conf)
self.assertEqual({'user_foo': 'foo.bar.com'}, client.default_labels)
with mock.patch.object(client, '_send_line') as mocked:
client.update_stats('metric', '10', labels={'app': 'value'})
self.assertEqual(
[mock.call('metric#app=value,user_foo=foo.bar.com:10|c')],
mocked.call_args_list)
def test_user_label_overridden_by_call_label(self):
conf = {
'log_statsd_host': 'myhost1',
'log_statsd_port': 1235,
'statsd_label_mode': 'librato',
'statsd_user_label_foo': 'foo',
}
client = statsd_client.get_labeled_statsd_client(conf)
self.assertEqual({'user_foo': 'foo'}, client.default_labels)
with mock.patch.object(client, '_send_line') as mocked:
client.update_stats('metric', '10', labels={'user_foo': 'bar'})
self.assertEqual(
[mock.call('metric#user_foo=bar:10|c')],
mocked.call_args_list)
def test_user_label_sorting(self):
conf = {
'log_statsd_host': 'myhost1',
'log_statsd_port': 1235,
'statsd_label_mode': 'librato',
'statsd_user_label_foo': 'middle',
}
labels = {'z': 'last', 'a': 'first'}
client = statsd_client.get_labeled_statsd_client(conf)
with mock.patch.object(client, '_send_line') as mocked:
client.update_stats('metric', '10', labels=labels)
self.assertEqual(
[mock.call('metric#a=first,user_foo=middle,z=last:10|c')],
mocked.call_args_list)
def test_user_label_invalid_chars(self):
invalid = ',|=[]:.'
for c in invalid:
user_label = 'statsd_user_label_foo%sbar' % c
conf = {
'log_statsd_host': 'myhost1',
'log_statsd_port': 1235,
'statsd_label_mode': 'librato',
user_label: 'buz',
}
with self.assertRaises(ValueError) as ctx:
statsd_client.get_labeled_statsd_client(conf)
self.assertEqual("invalid character in statsd "
"user label configuration "
"'%s': '%s'" % (user_label, c),
str(ctx.exception))
def test_user_label_value_invalid_chars(self):
invalid = ',|=[]:'
for c in invalid:
label_value = 'bar%sbaz' % c
conf = {
'log_statsd_host': 'myhost1',
'log_statsd_port': 1235,
'statsd_label_mode': 'librato',
'statsd_user_label_foo': label_value
}
with self.assertRaises(ValueError) as ctx:
statsd_client.get_labeled_statsd_client(conf)
self.assertEqual("invalid character in configuration "
"'statsd_user_label_foo' value "
"'%s': '%s'" % (label_value, c),
str(ctx.exception))
class TestGetStatsdClientSocket(BaseTestStatsdClient):
def make_test_client(self, conf, *args, **kwargs):
return statsd_client.get_statsd_client(conf, *args, **kwargs)
def test_ipv4_or_ipv6_hostname_defaults_to_ipv4(self):
def stub_getaddrinfo_both_ipv4_and_ipv6(host, port, family, *rest):
if family == socket.AF_INET:
@@ -182,7 +382,7 @@ class TestModuleFunctions(BaseTestStatsdClient):
with mock.patch.object(statsd_client.socket, 'getaddrinfo',
new=stub_getaddrinfo_both_ipv4_and_ipv6):
client = get_statsd_client({
client = self.make_test_client({
'log_statsd_host': 'localhost',
'log_statsd_port': '9876',
}, 'some-name', logger=self.logger)
@@ -194,7 +394,7 @@ class TestModuleFunctions(BaseTestStatsdClient):
self.assertEqual(got_sock.family, socket.AF_INET)
def test_ipv4_instantiation_and_socket_creation(self):
client = get_statsd_client({
client = self.make_test_client({
'log_statsd_host': '127.0.0.1',
'log_statsd_port': '9876',
}, 'some-name', logger=self.logger)
@@ -233,7 +433,7 @@ class TestModuleFunctions(BaseTestStatsdClient):
with mock.patch.object(statsd_client.socket,
'getaddrinfo', fake_getaddrinfo):
client = get_statsd_client({
client = self.make_test_client({
'log_statsd_host': '::1',
'log_statsd_port': '9876',
}, 'some-name', logger=self.logger)
@@ -248,7 +448,7 @@ class TestModuleFunctions(BaseTestStatsdClient):
stub_err = statsd_client.socket.gaierror('whoops')
with mock.patch.object(statsd_client.socket, 'getaddrinfo',
side_effect=stub_err):
client = get_statsd_client({
client = self.make_test_client({
'log_statsd_host': 'i-am-not-a-hostname-or-ip',
'log_statsd_port': '9876',
}, 'some-name', logger=self.logger)
@@ -264,6 +464,16 @@ class TestModuleFunctions(BaseTestStatsdClient):
# statsd sends will warn in the logs until the DNS failure or invalid
# IP address in the configuration is fixed.
class TestGetLabeledStatsdClientSocket(TestGetStatsdClientSocket):
def make_test_client(self, conf, *args, logger=None):
return statsd_client.get_labeled_statsd_client(conf, logger=logger)
class TestGetStatsdClientSending(BaseTestStatsdClient):
"""
Tests here use get_statsd_client to make a StatsdClient.
"""
def test_sending_ipv6(self):
def fake_getaddrinfo(host, port, *args):
# this is what a real getaddrinfo('::1', port,
@@ -361,7 +571,7 @@ class TestModuleFunctions(BaseTestStatsdClient):
self.assertTrue(payload.endswith(suffix), payload)
class TestStatsdClientOutput(unittest.TestCase):
class BaseTestStatsdClientOutput(unittest.TestCase):
def setUp(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -374,13 +584,13 @@ class TestStatsdClientOutput(unittest.TestCase):
self.client = None
def tearDown(self):
# The "no-op when disabled" test doesn't set up a real logger, so
# The "no-op when disabled" test doesn't set up a real client, so
# create one here so we can tell the reader thread to stop.
if not self.client:
self.client = get_statsd_client({
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
}, 'some-name')
})
self.client.increment('STOP')
self.reader_thread.join(timeout=4)
self.sock.close()
@@ -405,20 +615,25 @@ class TestStatsdClientOutput(unittest.TestCase):
while not got:
sender_fn(*args, **kwargs)
try:
got = self.queue.get(timeout=0.5)
got = self.queue.get(timeout=0.3)
except Empty:
pass
return got
return got.decode('utf-8')
def assertStat(self, expected, sender_fn, *args, **kwargs):
got = self._send_and_get(sender_fn, *args, **kwargs).decode('utf-8')
got = self._send_and_get(sender_fn, *args, **kwargs)
return self.assertEqual(expected, got)
def assertStatMatches(self, expected_regexp, sender_fn, *args, **kwargs):
got = self._send_and_get(sender_fn, *args, **kwargs).decode('utf-8')
got = self._send_and_get(sender_fn, *args, **kwargs)
return self.assertTrue(re.search(expected_regexp, got),
[got, expected_regexp])
class TestGetStatsdClientOutput(BaseTestStatsdClientOutput):
"""
Tests here use get_statsd_client to make a StatsdClient.
"""
def test_methods_are_no_ops_when_not_enabled(self):
self.client = get_statsd_client({
# No "log_statsd_host" means "disabled"
@@ -449,6 +664,7 @@ class TestStatsdClientOutput(unittest.TestCase):
self.client = get_statsd_client({
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': 'disabled', # ignored
}, 'some-name')
self.assertStat('some-name.some.counter:1|c', self.client.increment,
'some.counter')
@@ -592,3 +808,267 @@ class TestStatsdClientOutput(unittest.TestCase):
self.assertStat('alpha.beta.another.counter:3|c|@0.9912',
self.client.update_stats, 'another.counter', 3,
sample_rate=0.9912)
def test_statsd_methods_legacy_disabled(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'log_statsd_metric_prefix': 'my_prefix',
'statsd_emit_legacy': 'false',
}
statsd = statsd_client.get_statsd_client(conf, tail_prefix='pfx')
with mock.patch.object(statsd, '_open_socket') as mock_open:
statsd.increment('some.counter')
statsd.decrement('some.counter')
statsd.timing('some.timing', 6.28 * 1000)
statsd.update_stats('some.stat', 3)
self.assertFalse(mock_open.mock_calls)
class TestGetLabeledStatsdClientOutput(BaseTestStatsdClientOutput):
"""
Tests here use get_labeled_statsd_client to make a LabeledStatsdClient.
"""
def test_statsd_methods_disabled(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'log_statsd_metric_prefix': 'my_prefix',
'statsd_label_mode': 'disabled',
}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
with mock.patch.object(labeled_statsd,
'_open_socket') as mock_open:
# Any labeled-metrics callers should not emit any metrics
labeled_statsd.increment('the_counter', labels=labels)
labeled_statsd.decrement('the_counter', labels=labels)
labeled_statsd.timing('the_timing', 6.28 * 1000, labels=labels)
labeled_statsd.update_stats('the_stat', 3, labels=labels)
self.assertFalse(mock_open.mock_calls)
def test_statsd_methods_dogstatsd(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': 'dogstatsd',
'statsd_emit_legacy': 'false', # ignored
}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter:1|c|#action:some,result:ok',
labeled_statsd.increment, 'the_counter', labels=labels)
self.assertStat(
'the_counter:-1|c|#action:some,result:ok',
labeled_statsd.decrement, 'the_counter', labels=labels)
self.assertStat(
'the_timing:6280.0|ms'
'|#action:some,result:ok',
labeled_statsd.timing, 'the_timing', 6.28 * 1000, labels=labels)
self.assertStat(
'the_stat:3|c|#action:some,result:ok',
labeled_statsd.update_stats, 'the_stat', 3, labels=labels)
def test_statsd_methods_dogstatsd_sample_rate(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': 'dogstatsd',
'log_statsd_default_sample_rate': '0.9',
'log_statsd_sample_rate_factor': '0.5'}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter:1|c|@0.45|#action:some,result:ok',
labeled_statsd.increment, 'the_counter', labels=labels)
def test_statsd_methods_graphite(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'log_statsd_metric_prefix': 'my_prefix',
'statsd_label_mode': 'graphite',
}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter;action=some;result=ok:1|c',
labeled_statsd.increment, 'the_counter', labels=labels)
self.assertStat(
'the_counter;action=some;result=ok:-1|c',
labeled_statsd.decrement, 'the_counter', labels=labels)
self.assertStat(
'the_timing;action=some;result=ok'
':6280.0|ms',
labeled_statsd.timing, 'the_timing', 6.28 * 1000, labels=labels)
self.assertStat(
'the_stat;action=some;result=ok:3|c',
labeled_statsd.update_stats, 'the_stat', 3, labels=labels)
def test_statsd_methods_graphite_sample_rate(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': 'graphite',
'log_statsd_default_sample_rate': '0.9',
'log_statsd_sample_rate_factor': '0.5'}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter;action=some;result=ok:1|c|@0.45',
labeled_statsd.increment, 'the_counter', labels=labels)
def test_statsd_methods_influxdb(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'log_statsd_metric_prefix': 'my_prefix',
'statsd_label_mode': 'influxdb',
}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter,action=some,result=ok:1|c',
labeled_statsd.increment, 'the_counter', labels=labels)
self.assertStat(
'the_counter,action=some,result=ok:-1|c',
labeled_statsd.decrement, 'the_counter', labels=labels)
self.assertStat(
'the_counter,action=some,result=ok:-1|c',
labeled_statsd.decrement, 'the_counter', labels=labels)
self.assertStat(
'the_timing,action=some,result=ok'
':6280.0|ms',
labeled_statsd.timing, 'the_timing', 6.28 * 1000, labels=labels)
self.assertStat(
'the_stat,action=some,result=ok:3|c',
labeled_statsd.update_stats, 'the_stat', 3, labels=labels)
def test_statsd_methods_influxdb_sample_rate(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': 'influxdb',
'log_statsd_default_sample_rate': '0.9',
'log_statsd_sample_rate_factor': '0.5'}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the.counter,action=some,result=ok:1|c|@0.45',
labeled_statsd.increment, 'the.counter', labels=labels)
def test_statsd_methods_librato(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'log_statsd_metric_prefix': 'my_prefix',
'statsd_label_mode': 'librato',
}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter#action=some,result=ok:1|c',
labeled_statsd.increment, 'the_counter', labels=labels)
self.assertStat(
'the_counter#action=some,result=ok:-1|c',
labeled_statsd.decrement, 'the_counter', labels=labels)
self.assertStat(
'the_timing#action=some,result=ok'
':6280.0|ms',
labeled_statsd.timing, 'the_timing', 6.28 * 1000, labels=labels)
self.assertStat(
'the_stat#action=some,result=ok:3|c',
labeled_statsd.update_stats, 'the_stat', 3, labels=labels)
def test_statsd_methods_librato_sample_rate(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': 'librato',
'log_statsd_default_sample_rate': '0.9',
'log_statsd_sample_rate_factor': '0.5'}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter#action=some,result=ok:1|c|@0.45',
labeled_statsd.increment, 'the_counter', labels=labels)
def test_statsd_methods_signalfx(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'log_statsd_metric_prefix': 'my_prefix',
'statsd_label_mode': 'signalfx',
}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter[action=some,result=ok]:1|c',
labeled_statsd.increment, 'the_counter', labels=labels)
self.assertStat(
'the_counter[action=some,result=ok]:-1|c',
labeled_statsd.decrement, 'the_counter', labels=labels)
self.assertStat(
'the_timing[action=some,result=ok]'
':6280.0|ms',
labeled_statsd.timing, 'the_timing', 6.28 * 1000, labels=labels)
self.assertStat(
'the_stat[action=some,result=ok]:3|c',
labeled_statsd.update_stats, 'the_stat', 3, labels=labels)
def test_statsd_methods_signalfx_sample_rate(self):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': 'signalfx',
'log_statsd_default_sample_rate': '0.9',
'log_statsd_sample_rate_factor': '0.5'}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
labels = {'action': 'some', 'result': 'ok'}
self.assertStat(
'the_counter[action=some,result=ok]:1|c|@0.45',
labeled_statsd.increment, 'the_counter', labels=labels)
def _do_test_statsd_methods_no_labels(self, label_mode):
conf = {
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'statsd_label_mode': label_mode,
}
labeled_statsd = statsd_client.get_labeled_statsd_client(conf)
self.assertStat('the.counter:1|c',
labeled_statsd.increment, 'the.counter', labels={})
self.assertStat('the.counter:-1|c',
labeled_statsd.decrement, 'the.counter', labels={})
self.assertStat(
'the.timing:6280.0|ms',
labeled_statsd.timing, 'the.timing', 6.28 * 1000, labels={})
self.assertStat('the.stat:3|c',
labeled_statsd.update_stats, 'the.stat', 3, labels={})
self.assertStat('the.counter:1|c',
labeled_statsd.increment, 'the.counter')
self.assertStat('the.counter:-1|c',
labeled_statsd.decrement, 'the.counter')
self.assertStat('the.timing:6280.0|ms',
labeled_statsd.timing, 'the.timing', 6.28 * 1000)
self.assertStat('the.stat:3|c',
labeled_statsd.update_stats, 'the.stat', 3)
self.assertStat('the.stat:500.0|ms',
labeled_statsd.transfer_rate, 'the.stat', 3.3, 6600)
def test_statsd_methods_dogstatsd_no_labels(self):
self._do_test_statsd_methods_no_labels('dogstatsd')
def test_statsd_methods_graphite_no_labels(self):
self._do_test_statsd_methods_no_labels('graphite')
def test_statsd_methods_influxdb_no_labels(self):
self._do_test_statsd_methods_no_labels('influxdb')
def test_statsd_methods_librato_no_labels(self):
self._do_test_statsd_methods_no_labels('librato')
def test_statsd_methods_signalfx_no_labels(self):
self._do_test_statsd_methods_no_labels('signalfx')

View File

@@ -2292,6 +2292,7 @@ class TestProxyServerConfigLoading(unittest.TestCase):
self.assertEqual(app.logger.logger.statsd_client._port, 8125)
self.assertEqual(app.logger.logger.statsd_client._prefix,
'proxy-server.')
self.assertTrue(app.logger.logger.statsd_client.emit_legacy)
conf_sections = """
[DEFAULT]
@@ -2320,6 +2321,7 @@ class TestProxyServerConfigLoading(unittest.TestCase):
self.assertEqual(app.logger.logger.statsd_client._port, 8125)
self.assertEqual(app.logger.logger.statsd_client._prefix,
'proxy-server.')
self.assertTrue(app.logger.logger.statsd_client.emit_legacy)
class TestProxyServerConfigStringLoading(TestProxyServerConfigLoading):