Collect request stats
This is a thing that nodepool has been doing for ages. With the upcoming changes to remove the task manager, the mechanism it has been using to put activity in the right place isn't going to be available anymore. But also, people using openstacksdk from within a service might also want to be able to do the same logging. This improves upon the old method as well, as it uses the history in the response object to get and report on all of the calls made as part of a request. This will catch things that do auto retries. While we're in there, add support for reporting to prometheus instead. The prometheus support does not read from config, and does not run an http service, since openstacksdk is a library. It is expected that an application that uses openstacksdk and wants request stats collected will pass a prometheus_client.CollectorRegistry to collector_registry. Change-Id: I7218179dd5f0c068a52a4704b2ce1a0942fdc0d1
This commit is contained in:
parent
f9b0911166
commit
c8b96cddd3
@ -24,6 +24,7 @@ os-client-config==1.28.0
|
||||
os-service-types==1.2.0
|
||||
oslotest==3.2.0
|
||||
pbr==2.0.0
|
||||
prometheus-client==0.4.2
|
||||
Pygments==2.2.0
|
||||
python-mimeparse==1.6.0
|
||||
python-subunit==1.0.0
|
||||
@ -32,6 +33,7 @@ requests==2.18.0
|
||||
requests-mock==1.2.0
|
||||
requestsexceptions==1.2.0
|
||||
six==1.10.0
|
||||
statsd==3.3.0
|
||||
stestr==1.0.0
|
||||
stevedore==1.20.0
|
||||
testrepository==0.0.18
|
||||
|
@ -451,6 +451,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
|
||||
interface=self.config.get_interface(service_type),
|
||||
endpoint_override=self.config.get_endpoint(service_type),
|
||||
region_name=self.config.region_name,
|
||||
statsd_prefix=self.config.get_statsd_prefix(),
|
||||
statsd_client=self.config.get_statsd_client(),
|
||||
prometheus_counter=self.config.get_prometheus_counter(),
|
||||
prometheus_histogram=self.config.get_prometheus_histogram(),
|
||||
min_version=request_min_version,
|
||||
max_version=request_max_version)
|
||||
if adapter.get_endpoint():
|
||||
|
@ -21,6 +21,15 @@ from keystoneauth1 import session as ks_session
|
||||
import os_service_types
|
||||
import requestsexceptions
|
||||
from six.moves import urllib
|
||||
try:
|
||||
import statsd
|
||||
except ImportError:
|
||||
statsd = None
|
||||
try:
|
||||
import prometheus_client
|
||||
except ImportError:
|
||||
prometheus_client = None
|
||||
|
||||
|
||||
from openstack import version as openstack_version
|
||||
from openstack import _log
|
||||
@ -96,7 +105,9 @@ class CloudRegion(object):
|
||||
discovery_cache=None, extra_config=None,
|
||||
cache_expiration_time=0, cache_expirations=None,
|
||||
cache_path=None, cache_class='dogpile.cache.null',
|
||||
cache_arguments=None, password_callback=None):
|
||||
cache_arguments=None, password_callback=None,
|
||||
statsd_host=None, statsd_port=None, statsd_prefix=None,
|
||||
collector_registry=None):
|
||||
self._name = name
|
||||
self.region_name = region_name
|
||||
self.config = _util.normalize_keys(config)
|
||||
@ -116,6 +127,11 @@ class CloudRegion(object):
|
||||
self._cache_class = cache_class
|
||||
self._cache_arguments = cache_arguments
|
||||
self._password_callback = password_callback
|
||||
self._statsd_host = statsd_host
|
||||
self._statsd_port = statsd_port
|
||||
self._statsd_prefix = statsd_prefix
|
||||
self._statsd_client = None
|
||||
self._collector_registry = collector_registry
|
||||
|
||||
self._service_type_manager = os_service_types.ServiceTypes()
|
||||
|
||||
@ -471,6 +487,11 @@ class CloudRegion(object):
|
||||
self.get_connect_retries(service_type))
|
||||
kwargs.setdefault('status_code_retries',
|
||||
self.get_status_code_retries(service_type))
|
||||
kwargs.setdefault('statsd_prefix', self.get_statsd_prefix())
|
||||
kwargs.setdefault('statsd_client', self.get_statsd_client())
|
||||
kwargs.setdefault('prometheus_counter', self.get_prometheus_counter())
|
||||
kwargs.setdefault(
|
||||
'prometheus_histogram', self.get_prometheus_histogram())
|
||||
endpoint_override = self.get_endpoint(service_type)
|
||||
version = version_request.version
|
||||
min_api_version = (
|
||||
@ -746,3 +767,61 @@ class CloudRegion(object):
|
||||
def get_concurrency(self, service_type=None):
|
||||
return self._get_service_config(
|
||||
'concurrency', service_type=service_type)
|
||||
|
||||
def get_statsd_client(self):
|
||||
if not statsd:
|
||||
return None
|
||||
statsd_args = {}
|
||||
if self._statsd_host:
|
||||
statsd_args['host'] = self._statsd_host
|
||||
if self._statsd_port:
|
||||
statsd_args['port'] = self._statsd_port
|
||||
if statsd_args:
|
||||
return statsd.StatsClient(**statsd_args)
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_statsd_prefix(self):
|
||||
return self._statsd_prefix or 'openstack.api'
|
||||
|
||||
def get_prometheus_registry(self):
|
||||
if not self._collector_registry and prometheus_client:
|
||||
self._collector_registry = prometheus_client.REGISTRY
|
||||
return self._collector_registry
|
||||
|
||||
def get_prometheus_histogram(self):
|
||||
registry = self.get_prometheus_registry()
|
||||
if not registry or not prometheus_client:
|
||||
return
|
||||
# We have to hide a reference to the histogram on the registry
|
||||
# object, because it's collectors must be singletons for a given
|
||||
# registry but register at creation time.
|
||||
hist = getattr(registry, '_openstacksdk_histogram', None)
|
||||
if not hist:
|
||||
hist = prometheus_client.Histogram(
|
||||
'openstack_http_response_time',
|
||||
'Time taken for an http response to an OpenStack service',
|
||||
labelnames=[
|
||||
'method', 'endpoint', 'service_type', 'status_code'
|
||||
],
|
||||
registry=registry,
|
||||
)
|
||||
registry._openstacksdk_histogram = hist
|
||||
return hist
|
||||
|
||||
def get_prometheus_counter(self):
|
||||
registry = self.get_prometheus_registry()
|
||||
if not registry or not prometheus_client:
|
||||
return
|
||||
counter = getattr(registry, '_openstacksdk_counter', None)
|
||||
if not counter:
|
||||
counter = prometheus_client.Counter(
|
||||
'openstack_http_requests',
|
||||
'Number of HTTP requests made to an OpenStack service',
|
||||
labelnames=[
|
||||
'method', 'endpoint', 'service_type', 'status_code'
|
||||
],
|
||||
registry=registry,
|
||||
)
|
||||
registry._openstacksdk_counter = counter
|
||||
return counter
|
||||
|
@ -140,7 +140,9 @@ class OpenStackConfig(object):
|
||||
envvar_prefix=None, secure_files=None,
|
||||
pw_func=None, session_constructor=None,
|
||||
app_name=None, app_version=None,
|
||||
load_yaml_config=True, load_envvars=True):
|
||||
load_yaml_config=True, load_envvars=True,
|
||||
statsd_host=None, statsd_port=None,
|
||||
statsd_prefix=None):
|
||||
self.log = _log.setup_logging('openstack.config')
|
||||
self._session_constructor = session_constructor
|
||||
self._app_name = app_name
|
||||
@ -276,6 +278,21 @@ class OpenStackConfig(object):
|
||||
self._cache_expirations = cache_settings.get(
|
||||
'expiration', self._cache_expirations)
|
||||
|
||||
if load_yaml_config:
|
||||
statsd_config = self.cloud_config.get('statsd', {})
|
||||
statsd_host = statsd_host or statsd_config.get('host')
|
||||
statsd_port = statsd_port or statsd_config.get('port')
|
||||
statsd_prefix = statsd_prefix or statsd_config.get('prefix')
|
||||
|
||||
if load_envvars:
|
||||
statsd_host = statsd_host or os.environ.get('STATSD_HOST')
|
||||
statsd_port = statsd_port or os.environ.get('STATSD_PORT')
|
||||
statsd_prefix = statsd_prefix or os.environ.get('STATSD_PREFIX')
|
||||
|
||||
self._statsd_host = statsd_host
|
||||
self._statsd_port = statsd_port
|
||||
self._statsd_prefix = statsd_prefix
|
||||
|
||||
# Flag location to hold the peeked value of an argparse timeout value
|
||||
self._argv_timeout = False
|
||||
|
||||
@ -1091,6 +1108,9 @@ class OpenStackConfig(object):
|
||||
cache_class=self._cache_class,
|
||||
cache_arguments=self._cache_arguments,
|
||||
password_callback=self._pw_callback,
|
||||
statsd_host=self._statsd_host,
|
||||
statsd_port=self._statsd_port,
|
||||
statsd_prefix=self._statsd_prefix,
|
||||
)
|
||||
# TODO(mordred) Backwards compat for OSC transition
|
||||
get_one_cloud = get_one
|
||||
|
@ -59,7 +59,9 @@ def _extract_name(url, service_type=None):
|
||||
# Strip leading version piece so that
|
||||
# GET /v2.0/networks
|
||||
# returns ['networks']
|
||||
if url_parts[0] in ('v1', 'v2', 'v2.0'):
|
||||
if (url_parts[0]
|
||||
and url_parts[0][0] == 'v'
|
||||
and url_parts[0][1] and url_parts[0][1].isdigit()):
|
||||
url_parts = url_parts[1:]
|
||||
name_parts = []
|
||||
# Pull out every other URL portion - so that
|
||||
@ -118,12 +120,21 @@ class Proxy(adapter.Adapter):
|
||||
``<service-type>_status_code_retries``.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
session,
|
||||
statsd_client=None, statsd_prefix=None,
|
||||
prometheus_counter=None, prometheus_histogram=None,
|
||||
*args, **kwargs):
|
||||
# NOTE(dtantsur): keystoneauth defaults retriable_status_codes to None,
|
||||
# override it with a class-level value.
|
||||
kwargs.setdefault('retriable_status_codes',
|
||||
self.retriable_status_codes)
|
||||
super(Proxy, self).__init__(*args, **kwargs)
|
||||
super(Proxy, self).__init__(session=session, *args, **kwargs)
|
||||
self._statsd_client = statsd_client
|
||||
self._statsd_prefix = statsd_prefix
|
||||
self._prometheus_counter = prometheus_counter
|
||||
self._prometheus_histogram = prometheus_histogram
|
||||
|
||||
def request(
|
||||
self, url, method, error_message=None,
|
||||
@ -132,8 +143,36 @@ class Proxy(adapter.Adapter):
|
||||
url, method,
|
||||
connect_retries=connect_retries, raise_exc=False,
|
||||
**kwargs)
|
||||
for h in response.history:
|
||||
self._report_stats(h)
|
||||
self._report_stats(response)
|
||||
return response
|
||||
|
||||
def _report_stats(self, response):
|
||||
if self._statsd_client:
|
||||
self._report_stats_statsd(response)
|
||||
if self._prometheus_counter and self._prometheus_histogram:
|
||||
self._report_stats_prometheus(response)
|
||||
|
||||
def _report_stats_statsd(self, response):
|
||||
name_parts = _extract_name(response.request.url, self.service_type)
|
||||
key = '.'.join(
|
||||
[self._statsd_prefix, self.service_type, response.request.method]
|
||||
+ name_parts)
|
||||
self._statsd_client.timing(key, int(response.elapsed.seconds * 1000))
|
||||
self._statsd_client.incr(key)
|
||||
|
||||
def _report_stats_prometheus(self, response):
|
||||
labels = dict(
|
||||
method=response.request.method,
|
||||
endpoint=response.request.url,
|
||||
service_type=self.service_type,
|
||||
status_code=response.status_code,
|
||||
)
|
||||
self._prometheus_counter.labels(**labels).inc()
|
||||
self._prometheus_histogram.labels(**labels).observe(
|
||||
response.elapsed.seconds)
|
||||
|
||||
def _version_matches(self, version):
|
||||
api_version = self.get_api_major_version()
|
||||
if api_version:
|
||||
|
268
openstack/tests/unit/test_stats.py
Normal file
268
openstack/tests/unit/test_stats.py
Normal file
@ -0,0 +1,268 @@
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import os
|
||||
import pprint
|
||||
import threading
|
||||
import time
|
||||
import select
|
||||
import socket
|
||||
|
||||
import fixtures
|
||||
import prometheus_client
|
||||
import testtools.content
|
||||
|
||||
from openstack.tests.unit import base
|
||||
|
||||
|
||||
class StatsdFixture(fixtures.Fixture):
|
||||
def _setUp(self):
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.thread.daemon = True
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.sock.bind(('', 0))
|
||||
self.port = self.sock.getsockname()[1]
|
||||
self.wake_read, self.wake_write = os.pipe()
|
||||
self.stats = []
|
||||
self.thread.start()
|
||||
self.addCleanup(self._cleanup)
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
poll = select.poll()
|
||||
poll.register(self.sock, select.POLLIN)
|
||||
poll.register(self.wake_read, select.POLLIN)
|
||||
ret = poll.poll()
|
||||
for (fd, event) in ret:
|
||||
if fd == self.sock.fileno():
|
||||
data = self.sock.recvfrom(1024)
|
||||
if not data:
|
||||
return
|
||||
self.stats.append(data[0])
|
||||
if fd == self.wake_read:
|
||||
return
|
||||
|
||||
def _cleanup(self):
|
||||
self.running = False
|
||||
os.write(self.wake_write, b'1\n')
|
||||
self.thread.join()
|
||||
|
||||
|
||||
class TestStats(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.statsd = StatsdFixture()
|
||||
self.useFixture(self.statsd)
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
self.useFixture(
|
||||
fixtures.EnvironmentVariable('STATSD_HOST', '127.0.0.1'))
|
||||
self.useFixture(
|
||||
fixtures.EnvironmentVariable('STATSD_PORT', str(self.statsd.port)))
|
||||
|
||||
self.add_info_on_exception('statsd_content', self.statsd.stats)
|
||||
# Set up the above things before the super setup so that we have the
|
||||
# environment variables set when the Connection is created.
|
||||
super(TestStats, self).setUp()
|
||||
|
||||
self._registry = prometheus_client.CollectorRegistry()
|
||||
self.cloud.config._collector_registry = self._registry
|
||||
self.addOnException(self._add_prometheus_samples)
|
||||
|
||||
def _add_prometheus_samples(self, exc_info):
|
||||
samples = []
|
||||
for metric in self._registry.collect():
|
||||
for s in metric.samples:
|
||||
samples.append(s)
|
||||
self.addDetail(
|
||||
'prometheus_samples',
|
||||
testtools.content.text_content(pprint.pformat(samples)))
|
||||
|
||||
def assert_reported_stat(self, key, value=None, kind=None):
|
||||
"""Check statsd output
|
||||
|
||||
Check statsd return values. A ``value`` should specify a
|
||||
``kind``, however a ``kind`` may be specified without a
|
||||
``value`` for a generic match. Leave both empy to just check
|
||||
for key presence.
|
||||
|
||||
:arg str key: The statsd key
|
||||
:arg str value: The expected value of the metric ``key``
|
||||
:arg str kind: The expected type of the metric ``key`` For example
|
||||
|
||||
- ``c`` counter
|
||||
- ``g`` gauge
|
||||
- ``ms`` timing
|
||||
- ``s`` set
|
||||
"""
|
||||
|
||||
self.assertIsNotNone(self.statsd)
|
||||
|
||||
if value:
|
||||
self.assertNotEqual(kind, None)
|
||||
|
||||
start = time.time()
|
||||
while time.time() < (start + 1):
|
||||
# Note our fake statsd just queues up results in a queue.
|
||||
# We just keep going through them until we find one that
|
||||
# matches, or fail out. If statsd pipelines are used,
|
||||
# large single packets are sent with stats separated by
|
||||
# newlines; thus we first flatten the stats out into
|
||||
# single entries.
|
||||
stats = itertools.chain.from_iterable(
|
||||
[s.decode('utf-8').split('\n') for s in self.statsd.stats])
|
||||
for stat in stats:
|
||||
k, v = stat.split(':')
|
||||
if key == k:
|
||||
if kind is None:
|
||||
# key with no qualifiers is found
|
||||
return True
|
||||
|
||||
s_value, s_kind = v.split('|')
|
||||
|
||||
# if no kind match, look for other keys
|
||||
if kind != s_kind:
|
||||
continue
|
||||
|
||||
if value:
|
||||
# special-case value|ms because statsd can turn
|
||||
# timing results into float of indeterminate
|
||||
# length, hence foiling string matching.
|
||||
if kind == 'ms':
|
||||
if float(value) == float(s_value):
|
||||
return True
|
||||
if value == s_value:
|
||||
return True
|
||||
# otherwise keep looking for other matches
|
||||
continue
|
||||
|
||||
# this key matches
|
||||
return True
|
||||
time.sleep(0.1)
|
||||
|
||||
raise Exception("Key %s not found in reported stats" % key)
|
||||
|
||||
def assert_prometheus_stat(self, name, value, labels=None):
|
||||
sample_value = self._registry.get_sample_value(name, labels)
|
||||
self.assertEqual(sample_value, value)
|
||||
|
||||
def test_list_projects(self):
|
||||
|
||||
mock_uri = self.get_mock_url(
|
||||
service_type='identity', interface='admin', resource='projects',
|
||||
base_url_append='v3')
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'projects': []})])
|
||||
|
||||
self.cloud.list_projects()
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.identity.GET.projects', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='identity',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
def test_projects(self):
|
||||
mock_uri = self.get_mock_url(
|
||||
service_type='identity', interface='admin', resource='projects',
|
||||
base_url_append='v3')
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'projects': []})])
|
||||
|
||||
list(self.cloud.identity.projects())
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.identity.GET.projects', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='identity',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
def test_servers(self):
|
||||
|
||||
mock_uri = 'https://compute.example.com/v2.1/servers/detail'
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'servers': []})])
|
||||
|
||||
list(self.cloud.compute.servers())
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.compute.GET.servers.detail', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='compute',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
def test_servers_no_detail(self):
|
||||
|
||||
mock_uri = 'https://compute.example.com/v2.1/servers'
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'servers': []})])
|
||||
|
||||
self.cloud.compute.get('/servers')
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.compute.GET.servers', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='compute',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
|
||||
class TestNoStats(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestNoStats, self).setUp()
|
||||
self.statsd = StatsdFixture()
|
||||
self.useFixture(self.statsd)
|
||||
|
||||
def test_no_stats(self):
|
||||
|
||||
mock_uri = self.get_mock_url(
|
||||
service_type='identity', interface='admin', resource='projects',
|
||||
base_url_append='v3')
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'projects': []})])
|
||||
|
||||
self.cloud.identity._statsd_client = None
|
||||
list(self.cloud.identity.projects())
|
||||
self.assert_calls()
|
||||
self.assertEqual([], self.statsd.stats)
|
5
releasenotes/notes/request-stats-9d70480bebbdb4d6.yaml
Normal file
5
releasenotes/notes/request-stats-9d70480bebbdb4d6.yaml
Normal file
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Added support for collecting and reporting stats on calls made to
|
||||
statsd and prometheus.
|
@ -8,9 +8,11 @@ extras>=1.0.0 # MIT
|
||||
fixtures>=3.0.0 # Apache-2.0/BSD
|
||||
jsonschema<3.0.0,>=2.6.0 # MIT
|
||||
mock>=2.0.0 # BSD
|
||||
prometheus-client>=0.4.2 # Apache-2.0
|
||||
python-subunit>=1.0.0 # Apache-2.0/BSD
|
||||
oslotest>=3.2.0 # Apache-2.0
|
||||
requests-mock>=1.2.0 # Apache-2.0
|
||||
statsd>=3.3.0
|
||||
stestr>=1.0.0 # Apache-2.0
|
||||
testrepository>=0.0.18 # Apache-2.0/BSD
|
||||
testscenarios>=0.4 # Apache-2.0/BSD
|
||||
|
Loading…
x
Reference in New Issue
Block a user