From bdbb6d62ee20bfd5ffc59f8772a5a0e60614ba90 Mon Sep 17 00:00:00 2001 From: Ching Kuo Date: Mon, 9 Nov 2020 10:17:20 +0800 Subject: [PATCH] Add Support For oslo.metrics This commit added support to send rpc metrics to oslo.metrics. Changes includes: - Adding client wrapper for oslo.metrics to process metrics information and send to oslo.metrics socket - Modify rpc client to send metric when certain rpc events happens For more information on oslo.metrics https://opendev.org/openstack/oslo.metrics Change-Id: Idf8cc0e52ced1f697ac4048655eff4c956fd5c79 --- oslo_messaging/_metrics/__init__.py | 19 ++ oslo_messaging/_metrics/client.py | 256 ++++++++++++++++++ oslo_messaging/conffixture.py | 4 + oslo_messaging/rpc/client.py | 53 ++-- .../tests/functional/test_functional.py | 34 +++ ...oslo-metrics-support-fe16343a637cc14b.yaml | 8 + requirements.txt | 3 + 7 files changed, 360 insertions(+), 17 deletions(-) create mode 100644 oslo_messaging/_metrics/__init__.py create mode 100644 oslo_messaging/_metrics/client.py create mode 100644 releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml diff --git a/oslo_messaging/_metrics/__init__.py b/oslo_messaging/_metrics/__init__.py new file mode 100644 index 000000000..d624714f0 --- /dev/null +++ b/oslo_messaging/_metrics/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2020 LINE Corp. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +__all__ = [ + 'MetricsCollectorClient', + 'get_collector', +] + +from .client import * diff --git a/oslo_messaging/_metrics/client.py b/oslo_messaging/_metrics/client.py new file mode 100644 index 000000000..46916a150 --- /dev/null +++ b/oslo_messaging/_metrics/client.py @@ -0,0 +1,256 @@ + +# Copyright 2020 LINE Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import queue +import socket +import threading +import time + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_metrics import message_type +from oslo_utils import eventletutils +from oslo_utils import importutils + + +LOG = logging.getLogger(__name__) + +eventlet = importutils.try_import('eventlet') +if eventlet and eventletutils.is_monkey_patched("thread"): + # Here we initialize module with the native python threading module + # if it was already monkey patched by eventlet/greenlet. + stdlib_threading = eventlet.patcher.original('threading') +else: + # Manage the case where we run this driver in a non patched environment + # and where user even so configure the driver to run heartbeat through + # a python thread, if we don't do that when the heartbeat will start + # we will facing an issue by trying to override the threading module. + stdlib_threading = threading + +oslo_messaging_metrics = [ + cfg.BoolOpt('metrics_enabled', default=False, + help='Boolean to send rpc metrics to oslo.metrics.'), + cfg.IntOpt('metrics_buffer_size', default=1000, + help='Buffer size to store in oslo.messaging.'), + cfg.StrOpt('metrics_socket_file', + default='/var/tmp/metrics_collector.sock', + help='Unix domain socket file to be used' + ' to send rpc related metrics'), + cfg.StrOpt('metrics_process_name', + default='', + help='Process name which is used to identify which process' + ' produce metrics'), + cfg.IntOpt('metrics_thread_stop_timeout', + default=10, + help='Sending thread stop once metrics_thread_stop_timeout' + ' seconds after the last successful metrics send.' + ' So that this thread will not be the blocker' + ' when process is shutting down.' + ' If the process is still running, sending thread will' + ' be restarted at the next metrics queueing time') +] +cfg.CONF.register_opts(oslo_messaging_metrics, group='oslo_messaging_metrics') + + +class MetricsCollectorClient(object): + + def __init__(self, conf, metrics_type, **kwargs): + self.conf = conf.oslo_messaging_metrics + self.unix_socket = self.conf.metrics_socket_file + buffer_size = self.conf.metrics_buffer_size + self.tx_queue = queue.Queue(buffer_size) + self.next_send_metric = None + self.metrics_type = metrics_type + self.args = kwargs + self.send_thread = threading.Thread(target=self.send_loop) + self.send_thread.start() + + def __enter__(self): + if not self.conf.metrics_enabled: + return None + self.start_time = time.time() + send_method = getattr(self, self.metrics_type + + "_invocation_start_total") + send_method(**self.args) + return self + + def __exit__(self, exc_type, exc_value, traceback): + if self.conf.metrics_enabled: + duration = time.time() - self.start_time + send_method = getattr( + self, self.metrics_type + "_processing_seconds") + send_method(duration=duration, **self.args) + send_method = getattr( + self, self.metrics_type + "_invocation_end_total") + send_method(**self.args) + + def put_into_txqueue(self, metrics_name, action, **labels): + + labels['process'] = \ + self.conf.metrics_process_name + m = message_type.Metric("oslo_messaging", metrics_name, action, + **labels) + + try: + self.tx_queue.put_nowait(m) + except queue.Full: + LOG.warning("tx queues is already full(%s/%s). Fails to " + "send the metrics(%s)" % + (self.tx_queue.qsize(), self.tx_queue.maxsize, m)) + + if not self.send_thread.is_alive(): + self.send_thread = threading.Thread(target=self.send_loop) + self.send_thread.start() + + def send_loop(self): + timeout = self.conf.metrics_thread_stop_timeout + stoptime = time.time() + timeout + while stoptime > time.time(): + if self.next_send_metric is None: + try: + self.next_send_metric = self.tx_queue.get(timeout=timeout) + except queue.Empty: + continue + try: + self.send_metric(self.next_send_metric) + self.next_send_metric = None + stoptime = time.time() + timeout + except Exception as e: + LOG.error("Failed to send metrics: %s. " + "Wait 1 seconds for next try." % e) + time.sleep(1) + + def send_metric(self, metric): + s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + s.connect(self.unix_socket) + s.send(metric.to_json().encode()) + s.close() + + def put_rpc_client_metrics_to_txqueue(self, metric_name, action, + target, method, call_type, timeout, + exception=None): + kwargs = { + 'call_type': call_type, + 'exchange': target.exchange, + 'topic': target.topic, + 'namespace': target.namespace, + 'version': target.version, + 'server': target.server, + 'fanout': target.fanout, + 'method': method, + 'timeout': timeout, + } + if exception: + kwargs['exception'] = exception + + self.put_into_txqueue(metric_name, action, **kwargs) + + def rpc_client_invocation_start_total(self, target, method, call_type, + timeout=None): + self.put_rpc_client_metrics_to_txqueue( + "rpc_client_invocation_start_total", + message_type.MetricAction("inc", None), + target, method, call_type, timeout + ) + + def rpc_client_invocation_end_total(self, target, method, call_type, + timeout=None): + self.put_rpc_client_metrics_to_txqueue( + "rpc_client_invocation_end_total", + message_type.MetricAction("inc", None), + target, method, call_type, timeout + ) + + def rpc_client_processing_seconds(self, target, method, call_type, + duration, timeout=None): + self.put_rpc_client_metrics_to_txqueue( + "rpc_client_processing_seconds", + message_type.MetricAction("observe", duration), + target, method, call_type, timeout + ) + + def rpc_client_exception_total(self, target, method, call_type, exception, + timeout=None): + self.put_rpc_client_metrics_to_txqueue( + "rpc_client_exception_total", + message_type.MetricAction("inc", None), + target, method, call_type, timeout, exception + ) + + def put_rpc_server_metrics_to_txqueue(self, metric_name, action, + target, endpoint, ns, ver, method, + exception=None): + kwargs = { + 'endpoint': endpoint, + 'namespace': ns, + 'version': ver, + 'method': method, + 'exchange': None, + 'topic': None, + 'server': None + } + if target: + kwargs['exchange'] = target.exchange + kwargs['topic'] = target.topic + kwargs['server'] = target.server + if exception: + kwargs['exception'] = exception + + self.put_into_txqueue(metric_name, action, **kwargs) + + def rpc_server_invocation_start_total(self, target, endpoint, + ns, ver, method): + self.put_rpc_server_metrics_to_txqueue( + "rpc_server_invocation_start_total", + message_type.MetricAction("inc", None), + target, endpoint, ns, ver, method + ) + + def rpc_server_invocation_end_total(self, target, endpoint, + ns, ver, method): + self.put_rpc_server_metrics_to_txqueue( + "rpc_server_invocation_end_total", + message_type.MetricAction("inc", None), + target, endpoint, ns, ver, method + ) + + def rpc_server_processing_seconds(self, target, endpoint, ns, ver, + method, duration): + self.put_rpc_server_metrics_to_txqueue( + "rpc_server_processing_seconds", + message_type.MetricAction("observe", duration), + target, endpoint, ns, ver, method + ) + + def rpc_server_exception_total(self, target, endpoint, ns, ver, + method, exception): + self.put_rpc_server_metrics_to_txqueue( + "rpc_server_exception_total", + message_type.MetricAction("inc", None), + target, endpoint, ns, ver, method, exception=exception + ) + + +METRICS_COLLECTOR = None + + +def get_collector(conf, metrics_type, **kwargs): + global threading + threading = stdlib_threading + global METRICS_COLLECTOR + if METRICS_COLLECTOR is None: + METRICS_COLLECTOR = MetricsCollectorClient( + conf, metrics_type, **kwargs) + return METRICS_COLLECTOR diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index 39e76151d..2656ebf6c 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -67,6 +67,10 @@ class ConfFixture(fixtures.Fixture): 'oslo_messaging.notify.notifier', '_notifier_opts', 'oslo_messaging_notifications') + _import_opts(self.conf, + 'oslo_messaging._metrics.client', + 'oslo_messaging_metrics', + 'oslo_messaging_metrics') if transport_url is not None: self.transport_url = transport_url diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 115198bf5..cbec52554 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -21,6 +21,7 @@ import logging from oslo_config import cfg from oslo_messaging._drivers import base as driver_base +from oslo_messaging import _metrics as metrics from oslo_messaging import _utils as utils from oslo_messaging import exceptions from oslo_messaging import serializer as msg_serializer @@ -146,12 +147,23 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta): self._check_version_cap(msg.get('version')) - try: - self.transport._send(self.target, msg_ctxt, msg, - retry=self.retry, - transport_options=self.transport_options) - except driver_base.TransportDriverError as ex: - raise ClientSendError(self.target, ex) + with metrics.get_collector(self.conf, "rpc_client", + target=self.target, + method=method, + call_type="cast") as metrics_collector: + try: + self.transport._send(self.target, msg_ctxt, msg, + retry=self.retry, + transport_options=self.transport_options) + except driver_base.TransportDriverError as ex: + self._metrics_api.rpc_client_exception_total( + self.target, method, "cast", ex.__class__.__name__) + raise ClientSendError(self.target, ex) + except Exception as ex: + if self.conf.oslo_messaging_metrics.metrics_enabled: + metrics_collector.rpc_client_exception_total( + self.target, method, "cast", ex.__class__.__name__) + raise def call(self, ctxt, method, **kwargs): """Invoke a method and wait for a reply. See RPCClient.call().""" @@ -170,17 +182,24 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta): self._check_version_cap(msg.get('version')) - try: - result = \ - self.transport._send(self.target, msg_ctxt, msg, - wait_for_reply=True, timeout=timeout, - call_monitor_timeout=cm_timeout, - retry=self.retry, - transport_options=self.transport_options) - except driver_base.TransportDriverError as ex: - raise ClientSendError(self.target, ex) - - return self.serializer.deserialize_entity(ctxt, result) + with metrics.get_collector(self.conf, "rpc_client", + target=self.target, method=method, + call_type="call") as metrics_collector: + try: + result = self.transport._send( + self.target, msg_ctxt, msg, wait_for_reply=True, + timeout=timeout, call_monitor_timeout=cm_timeout, + retry=self.retry, transport_options=self.transport_options) + except driver_base.TransportDriverError as ex: + self._metrics_api.rpc_client_exception_total( + self.target, method, "call", ex.__class__.__name__) + raise ClientSendError(self.target, ex) + except Exception as ex: + if self.conf.oslo_messaging_metrics.metrics_enabled: + metrics_collector.rpc_client_exception_total( + self.target, method, "call", ex.__class__.__name__) + raise + return self.serializer.deserialize_entity(ctxt, result) @abc.abstractmethod def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index a71f6c25f..3c503ff6a 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -12,6 +12,8 @@ # under the License. import os +import requests +import subprocess import time import uuid @@ -565,3 +567,35 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual('test', event[1]) self.assertEqual('Hello World!', event[2]) self.assertEqual('abc', event[3]) + + +class MetricsTestCase(utils.SkipIfNoTransportURL): + + def setUp(self): + super(MetricsTestCase, self).setUp(conf=cfg.ConfigOpts()) + if self.rpc_url.startswith("kafka://"): + self.skipTest("kafka does not support RPC API") + + self.config(metrics_enabled=True, + group='oslo_messaging_metrics') + + def test_functional(self): + # verify call metrics is sent and reflected in oslo.metrics + self.config(metrics_socket_file='/var/tmp/metrics_collector.sock', + group='oslo_messaging_metrics') + metric_server = subprocess.Popen(["python3", "-m", "oslo_metrics"]) + time.sleep(1) + group = self.useFixture( + utils.RpcServerGroupFixture(self.conf, self.rpc_url)) + client = group.client(1) + client.add(increment=1) + time.sleep(1) + r = requests.get('http://localhost:3000') + for line in r.text.split('\n'): + if 'client_invocation_start_total{' in line: + self.assertEqual('1.0', line[-3:]) + elif 'client_invocation_end_total{' in line: + self.assertEqual('1.0', line[-3:]) + elif 'client_processing_seconds_count{' in line: + self.assertEqual('1.0', line[-3:]) + metric_server.terminate() diff --git a/releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml b/releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml new file mode 100644 index 000000000..541c98743 --- /dev/null +++ b/releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + | Introduce support for sending rpc client metrics to oslo.metrics. + | This feature can be enabled by setting a configuration parameter: + + [oslo_messaging_metrics] + metrics_enabled = True # default is false diff --git a/requirements.txt b/requirements.txt index 6cc4336be..e02718844 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,3 +28,6 @@ kombu>=4.6.6 # BSD # middleware oslo.middleware>=3.31.0 # Apache-2.0 + +# metrics +oslo.metrics>=0.2.1 # Apache-2.0