Rework statistics reporting

- Add support for reporting statistics to InfluxDB
- Move stats configuration under 'metrics' section of the yaml file
- fix metric name build when url contains project id
- report timing in milliseconds
- add initial docs for metrics reporting
- fix metric names in some weird cases
- allow individual proxy to override metrics naming logic

Change-Id: I76d2d78dc2f4c8cecbf89b8cc101c2bb1dec1a2b
This commit is contained in:
Artem Goncharov 2019-06-07 20:12:35 +02:00
parent 1ddca04a52
commit da45a449dc
12 changed files with 352 additions and 74 deletions

View File

@ -0,0 +1,59 @@
====================
Statistics reporting
====================
`openstacksdk` offers possibility to report statistics on individual API
requests/responses in different formats. `Statsd` allows reporting of the
response times in the statsd format. `InfluxDB` allows a more event-oriented
reporting of the same data. `Prometheus` reporting is a bit different and
requires the application using SDK to take care of the metrics exporting, while
`openstacksdk` prepares the metrics.
Due to the nature of the `statsd` protocol lots of tools consuming the metrics
do the data aggregation and processing in the configurable time frame (mean
value calculation for a 1 minute time frame). For the case of periodic tasks
this might not be very useful. A better fit for using `openstacksdk` as a
library is an 'event'-recording, where duration of an individual request is
stored and all required calculations are done if necessary in the monitoring
system based required timeframe, or the data is simply shown as is with no
analytics. A `comparison
<https://prometheus.io/docs/introduction/comparison/>`_ article describes
differences in those approaches.
Simple Usage
------------
To receive metrics add a following section to the config file (clouds.yaml):
.. code-block:: yaml
metrics:
statsd:
host: __statsd_server_host__
port: __statsd_server_port__
clouds:
..
In order to enable InfluxDB reporting following configuration need to be done
in the `clouds.yaml` file
.. code-block:: yaml
metrics:
influxdb:
host: __influxdb_server_host__
port: __influxdb_server_port__
use_udp: __True|False__
username: __influxdb_auth_username__
password: __influxdb_auth_password__
database: __influxdb_db_name__
measurement: __influxdb_measurement_name__
timeout: __infludb_requests_timeout__
clouds:
..
Metrics will be reported only when corresponding client libraries (
`statsd` for 'statsd' reporting, `influxdb` for influxdb reporting
correspondingly). When those libraries are not available reporting will be
silently ignored.

View File

@ -33,6 +33,7 @@ approach, this is where you'll want to begin.
Connect to an OpenStack Cloud <guides/connect>
Connect to an OpenStack Cloud Using a Config File <guides/connect_from_config>
Logging <guides/logging>
Statistics reporting <guides/stats>
Microversions <microversions>
Baremetal <guides/baremetal>
Block Storage <guides/block_storage>

View File

@ -363,6 +363,7 @@ class _OpenStackCloudMixin(object):
statsd_client=self.config.get_statsd_client(),
prometheus_counter=self.config.get_prometheus_counter(),
prometheus_histogram=self.config.get_prometheus_histogram(),
influxdb_client=self.config.get_influxdb_client(),
min_version=request_min_version,
max_version=request_max_version)
if adapter.get_endpoint():

View File

@ -30,6 +30,10 @@ try:
import prometheus_client
except ImportError:
prometheus_client = None
try:
import influxdb
except ImportError:
influxdb = None
from openstack import version as openstack_version
@ -218,6 +222,7 @@ class CloudRegion(object):
cache_path=None, cache_class='dogpile.cache.null',
cache_arguments=None, password_callback=None,
statsd_host=None, statsd_port=None, statsd_prefix=None,
influxdb_config=None,
collector_registry=None):
self._name = name
self.config = _util.normalize_keys(config)
@ -246,6 +251,8 @@ class CloudRegion(object):
self._statsd_port = statsd_port
self._statsd_prefix = statsd_prefix
self._statsd_client = None
self._influxdb_config = influxdb_config
self._influxdb_client = None
self._collector_registry = collector_registry
self._service_type_manager = os_service_types.ServiceTypes()
@ -646,6 +653,8 @@ class CloudRegion(object):
kwargs.setdefault('prometheus_counter', self.get_prometheus_counter())
kwargs.setdefault(
'prometheus_histogram', self.get_prometheus_histogram())
kwargs.setdefault('influxdb_config', self._influxdb_config)
kwargs.setdefault('influxdb_client', self.get_influxdb_client())
endpoint_override = self.get_endpoint(service_type)
version = version_request.version
min_api_version = (
@ -921,7 +930,11 @@ class CloudRegion(object):
if self._statsd_port:
statsd_args['port'] = self._statsd_port
if statsd_args:
return statsd.StatsClient(**statsd_args)
try:
return statsd.StatsClient(**statsd_args)
except Exception:
self.log.warning('Cannot establish connection to statsd')
return None
else:
return None
@ -988,3 +1001,29 @@ class CloudRegion(object):
service_type = service_type.lower().replace('-', '_')
d_key = _make_key('disabled_reason', service_type)
return self.config.get(d_key)
def get_influxdb_client(self):
influx_args = {}
if not self._influxdb_config:
return None
use_udp = bool(self._influxdb_config.get('use_udp', False))
port = self._influxdb_config.get('port')
if use_udp:
influx_args['use_udp'] = True
if 'port' in self._influxdb_config:
if use_udp:
influx_args['udp_port'] = port
else:
influx_args['port'] = port
for key in ['host', 'username', 'password', 'database', 'timeout']:
if key in self._influxdb_config:
influx_args[key] = self._influxdb_config[key]
if influxdb and influx_args:
try:
return influxdb.InfluxDBClient(**influx_args)
except Exception:
self.log.warning('Cannot establish connection to InfluxDB')
else:
self.log.warning('InfluxDB configuration is present, '
'but no client library is found.')
return None

View File

@ -142,7 +142,7 @@ class OpenStackConfig(object):
app_name=None, app_version=None,
load_yaml_config=True, load_envvars=True,
statsd_host=None, statsd_port=None,
statsd_prefix=None):
statsd_prefix=None, influxdb_config=None):
self.log = _log.setup_logging('openstack.config')
self._session_constructor = session_constructor
self._app_name = app_name
@ -254,6 +254,7 @@ class OpenStackConfig(object):
self._cache_class = 'dogpile.cache.null'
self._cache_arguments = {}
self._cache_expirations = {}
self._influxdb_config = {}
if 'cache' in self.cloud_config:
cache_settings = _util.normalize_keys(self.cloud_config['cache'])
@ -279,11 +280,34 @@ class OpenStackConfig(object):
'expiration', self._cache_expirations)
if load_yaml_config:
statsd_config = self.cloud_config.get('statsd', {})
metrics_config = self.cloud_config.get('metrics', {})
statsd_config = metrics_config.get('statsd', {})
statsd_host = statsd_host or statsd_config.get('host')
statsd_port = statsd_port or statsd_config.get('port')
statsd_prefix = statsd_prefix or statsd_config.get('prefix')
influxdb_cfg = metrics_config.get('influxdb', {})
# Parse InfluxDB configuration
if influxdb_config:
influxdb_cfg.update(influxdb_config)
if influxdb_cfg:
config = {}
if 'use_udp' in influxdb_cfg:
use_udp = influxdb_cfg['use_udp']
if isinstance(use_udp, str):
use_udp = use_udp.lower() in ('true', 'yes', '1')
elif not isinstance(use_udp, bool):
use_udp = False
self.log.warning('InfluxDB.use_udp value type is not '
'supported. Use one of '
'[true|false|yes|no|1|0]')
config['use_udp'] = use_udp
for key in ['host', 'port', 'username', 'password', 'database',
'measurement', 'timeout']:
if key in influxdb_cfg:
config[key] = influxdb_cfg[key]
self._influxdb_config = config
if load_envvars:
statsd_host = statsd_host or os.environ.get('STATSD_HOST')
statsd_port = statsd_port or os.environ.get('STATSD_PORT')
@ -1112,6 +1136,7 @@ class OpenStackConfig(object):
statsd_host=self._statsd_host,
statsd_port=self._statsd_port,
statsd_prefix=self._statsd_prefix,
influxdb_config=self._influxdb_config,
)
# TODO(mordred) Backwards compat for OSC transition
get_one_cloud = get_one

View File

@ -42,6 +42,38 @@ class Proxy(proxy.Proxy):
log = _log.setup_logging('openstack')
def _extract_name(self, url, service_type=None, project_id=None):
url_path = parse.urlparse(url).path.strip()
# Remove / from the beginning to keep the list indexes of interesting
# things consistent
if url_path.startswith('/'):
url_path = url_path[1:]
# Split url into parts and exclude potential project_id in some urls
url_parts = [
x for x in url_path.split('/') if (
x != project_id
and (
not project_id
or (project_id and x != 'AUTH_' + project_id)
))
]
# Strip leading version piece so that
# GET /v1/AUTH_xxx
# returns ['AUTH_xxx']
if (url_parts[0]
and url_parts[0][0] == 'v'
and url_parts[0][1] and url_parts[0][1].isdigit()):
url_parts = url_parts[1:]
name_parts = self._extract_name_consume_url_parts(url_parts)
# Getting the root of an endpoint is doing version discovery
if not name_parts:
name_parts = ['account']
# Strip out anything that's empty or None
return [part for part in name_parts if part]
def get_account_metadata(self):
"""Get metadata for this account.

View File

@ -26,6 +26,21 @@ from openstack import resource
class Proxy(proxy.Proxy):
def _extract_name_consume_url_parts(self, url_parts):
if (len(url_parts) == 3 and url_parts[0] == 'software_deployments'
and url_parts[1] == 'metadata'):
# Another nice example of totally different URL naming scheme,
# which we need to repair /software_deployment/metadata/server_id -
# just replace server_id with metadata to keep further logic
return ['software_deployment', 'metadata']
if (url_parts[0] == 'stacks' and len(url_parts) > 2
and not url_parts[2] in ['preview', 'resources']):
# orchestrate introduce having stack name and id part of the URL
# (/stacks/name/id/everything_else), so if on third position we
# have not a known part - discard it, not to brake further logic
del url_parts[2]
return super(Proxy, self)._extract_name_consume_url_parts(url_parts)
def read_env_and_templates(self, template_file=None, template_url=None,
template_object=None, files=None,
environment_files=None):

View File

@ -24,69 +24,6 @@ from openstack import exceptions
from openstack import resource
def _extract_name(url, service_type=None):
'''Produce a key name to use in logging/metrics from the URL path.
We want to be able to logic/metric sane general things, so we pull
the url apart to generate names. The function returns a list because
there are two different ways in which the elements want to be combined
below (one for logging, one for statsd)
Some examples are likely useful:
/servers -> ['servers']
/servers/{id} -> ['servers']
/servers/{id}/os-security-groups -> ['servers', 'os-security-groups']
/v2.0/networks.json -> ['networks']
'''
url_path = urllib.parse.urlparse(url).path.strip()
# Remove / from the beginning to keep the list indexes of interesting
# things consistent
if url_path.startswith('/'):
url_path = url_path[1:]
# Special case for neutron, which puts .json on the end of urls
if url_path.endswith('.json'):
url_path = url_path[:-len('.json')]
url_parts = url_path.split('/')
if url_parts[-1] == 'detail':
# Special case detail calls
# GET /servers/detail
# returns ['servers', 'detail']
name_parts = url_parts[-2:]
else:
# Strip leading version piece so that
# GET /v2.0/networks
# returns ['networks']
if (url_parts[0]
and url_parts[0][0] == 'v'
and url_parts[0][1] and url_parts[0][1].isdigit()):
url_parts = url_parts[1:]
name_parts = []
# Pull out every other URL portion - so that
# GET /servers/{id}/os-security-groups
# returns ['servers', 'os-security-groups']
for idx in range(0, len(url_parts)):
if not idx % 2 and url_parts[idx]:
name_parts.append(url_parts[idx])
# Keystone Token fetching is a special case, so we name it "tokens"
if url_path.endswith('tokens'):
name_parts = ['tokens']
# Getting the root of an endpoint is doing version discovery
if not name_parts:
if service_type == 'object-store':
name_parts = ['account']
else:
name_parts = ['discovery']
# Strip out anything that's empty or None
return [part for part in name_parts if part]
# The _check_resource decorator is used on Proxy methods to ensure that
# the `actual` argument is in fact the type of the `expected` argument.
# It does so under two cases:
@ -126,6 +63,7 @@ class Proxy(adapter.Adapter):
session,
statsd_client=None, statsd_prefix=None,
prometheus_counter=None, prometheus_histogram=None,
influxdb_config=None, influxdb_client=None,
*args, **kwargs):
# NOTE(dtantsur): keystoneauth defaults retriable_status_codes to None,
# override it with a class-level value.
@ -136,6 +74,8 @@ class Proxy(adapter.Adapter):
self._statsd_prefix = statsd_prefix
self._prometheus_counter = prometheus_counter
self._prometheus_histogram = prometheus_histogram
self._influxdb_client = influxdb_client
self._influxdb_config = influxdb_config
if self.service_type:
log_name = 'openstack.{0}'.format(self.service_type)
else:
@ -154,18 +94,107 @@ class Proxy(adapter.Adapter):
self._report_stats(response)
return response
def _extract_name(self, url, service_type=None, project_id=None):
'''Produce a key name to use in logging/metrics from the URL path.
We want to be able to logic/metric sane general things, so we pull
the url apart to generate names. The function returns a list because
there are two different ways in which the elements want to be combined
below (one for logging, one for statsd)
Some examples are likely useful:
/servers -> ['servers']
/servers/{id} -> ['server']
/servers/{id}/os-security-groups -> ['server', 'os-security-groups']
/v2.0/networks.json -> ['networks']
'''
url_path = urllib.parse.urlparse(url).path.strip()
# Remove / from the beginning to keep the list indexes of interesting
# things consistent
if url_path.startswith('/'):
url_path = url_path[1:]
# Special case for neutron, which puts .json on the end of urls
if url_path.endswith('.json'):
url_path = url_path[:-len('.json')]
# Split url into parts and exclude potential project_id in some urls
url_parts = [
x for x in url_path.split('/') if (
x != project_id
and (
not project_id
or (project_id and x != 'AUTH_' + project_id)
))
]
if url_parts[-1] == 'detail':
# Special case detail calls
# GET /servers/detail
# returns ['servers', 'detail']
name_parts = url_parts[-2:]
else:
# Strip leading version piece so that
# GET /v2.0/networks
# returns ['networks']
if (url_parts[0]
and url_parts[0][0] == 'v'
and url_parts[0][1] and url_parts[0][1].isdigit()):
url_parts = url_parts[1:]
name_parts = self._extract_name_consume_url_parts(url_parts)
# Keystone Token fetching is a special case, so we name it "tokens"
# NOTE(gtema): there is no metric triggered for regular authorization
# with openstack.connect(), since it bypassed SDK and goes directly to
# keystoneauth1. If you need to measure performance of the token
# fetching - trigger a separate call.
if url_path.endswith('tokens'):
name_parts = ['tokens']
if not name_parts:
name_parts = ['discovery']
# Strip out anything that's empty or None
return [part for part in name_parts if part]
def _extract_name_consume_url_parts(self, url_parts):
"""Pull out every other URL portion - so that
GET /servers/{id}/os-security-groups
returns ['server', 'os-security-groups']
"""
name_parts = []
for idx in range(0, len(url_parts)):
if not idx % 2 and url_parts[idx]:
# If we are on first segment and it end with 's' stip this 's'
# to differentiate LIST and GET_BY_ID
if (len(url_parts) > idx + 1
and url_parts[idx][-1] == 's'
and url_parts[idx][-2:] != 'is'):
name_parts.append(url_parts[idx][:-1])
else:
name_parts.append(url_parts[idx])
return name_parts
def _report_stats(self, response):
if self._statsd_client:
self._report_stats_statsd(response)
if self._prometheus_counter and self._prometheus_histogram:
self._report_stats_prometheus(response)
if self._influxdb_client:
self._report_stats_influxdb(response)
def _report_stats_statsd(self, response):
name_parts = _extract_name(response.request.url, self.service_type)
name_parts = self._extract_name(response.request.url,
self.service_type,
self.session.get_project_id())
key = '.'.join(
[self._statsd_prefix, self.service_type, response.request.method]
+ name_parts)
self._statsd_client.timing(key, int(response.elapsed.seconds * 1000))
self._statsd_client.timing(key, int(
response.elapsed.microseconds / 1000))
self._statsd_client.incr(key)
def _report_stats_prometheus(self, response):
@ -177,7 +206,35 @@ class Proxy(adapter.Adapter):
)
self._prometheus_counter.labels(**labels).inc()
self._prometheus_histogram.labels(**labels).observe(
response.elapsed.seconds)
response.elapsed.microseconds / 1000)
def _report_stats_influxdb(self, response):
# NOTE(gtema): status_code is saved both as tag and field to give
# ability showing it as a value and not only as a legend.
# However Influx is not ok with having same name in tags and fields,
# therefore use different names.
data = [dict(
measurement=(self._influxdb_config.get('measurement',
'openstack_api')
if self._influxdb_config else 'openstack_api'),
tags=dict(
method=response.request.method,
service_type=self.service_type,
status_code=response.status_code,
name='_'.join(self._extract_name(
response.request.url, self.service_type,
self.session.get_project_id())
)
),
fields=dict(
duration=int(response.elapsed.microseconds / 1000),
status_code_val=int(response.status_code)
)
)]
try:
self._influxdb_client.write_points(data)
except Exception:
self.log.exception('Error writing statistics to InfluxDB')
def _version_matches(self, version):
api_version = self.get_api_major_version()

View File

@ -9,6 +9,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
import random
import string
@ -237,3 +238,15 @@ class TestDownloadObject(base_test_object.BaseTestObject):
self.assertLessEqual(chunk_len, chunk_size)
self.assertEqual(chunk, self.the_data[start:end])
self.assert_calls()
class TestExtractName(TestObjectStoreProxy):
scenarios = [
('discovery', dict(url='/', parts=['account']))
]
def test_extract_name(self):
results = self.proxy._extract_name(self.url)
self.assertEqual(self.parts, results)

View File

@ -9,6 +9,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
import mock
import six
@ -312,3 +313,33 @@ class TestOrchestrationProxy(test_proxy_base.TestProxyBase):
None, template_url=None)
self.assertEqual("'template_url' must be specified when template is "
"None", six.text_type(err))
class TestExtractName(TestOrchestrationProxy):
scenarios = [
('stacks', dict(url='/stacks', parts=['stacks'])),
('name_id', dict(url='/stacks/name/id', parts=['stack'])),
('identity', dict(url='/stacks/id', parts=['stack'])),
('preview', dict(url='/stacks/name/preview',
parts=['stack', 'preview'])),
('stack_act', dict(url='/stacks/name/id/preview',
parts=['stack', 'preview'])),
('stack_subres', dict(url='/stacks/name/id/resources',
parts=['stack', 'resources'])),
('stack_subres_id', dict(url='/stacks/name/id/resources/id',
parts=['stack', 'resource'])),
('stack_subres_id_act',
dict(url='/stacks/name/id/resources/id/action',
parts=['stack', 'resource', 'action'])),
('event',
dict(url='/stacks/ignore/ignore/resources/ignore/events/id',
parts=['stack', 'resource', 'event'])),
('sd_metadata', dict(url='/software_deployments/metadata/ignore',
parts=['software_deployment', 'metadata']))
]
def test_extract_name(self):
results = self.proxy._extract_name(self.url)
self.assertEqual(self.parts, results)

View File

@ -467,19 +467,20 @@ class TestExtractName(base.TestCase):
scenarios = [
('slash_servers_bare', dict(url='/servers', parts=['servers'])),
('slash_servers_arg', dict(url='/servers/1', parts=['servers'])),
('slash_servers_arg', dict(url='/servers/1', parts=['server'])),
('servers_bare', dict(url='servers', parts=['servers'])),
('servers_arg', dict(url='servers/1', parts=['servers'])),
('servers_arg', dict(url='servers/1', parts=['server'])),
('networks_bare', dict(url='/v2.0/networks', parts=['networks'])),
('networks_arg', dict(url='/v2.0/networks/1', parts=['networks'])),
('networks_arg', dict(url='/v2.0/networks/1', parts=['network'])),
('tokens', dict(url='/v3/tokens', parts=['tokens'])),
('discovery', dict(url='/', parts=['discovery'])),
('secgroups', dict(
url='/servers/1/os-security-groups',
parts=['servers', 'os-security-groups'])),
parts=['server', 'os-security-groups'])),
('bm_chassis', dict(url='/v1/chassis/id', parts=['chassis']))
]
def test_extract_name(self):
results = proxy._extract_name(self.url)
results = proxy.Proxy(mock.Mock())._extract_name(self.url)
self.assertEqual(self.parts, results)

View File

@ -0,0 +1,4 @@
---
features:
- |
Add possibility to report API metrics into InfluxDB.