Add Support For oslo.metrics

This commit added support to send rpc metrics to oslo.metrics.

Changes includes:
- Adding client wrapper for oslo.metrics to process metrics information
  and send to oslo.metrics socket
- Modify rpc client to send metric when certain rpc events happens

For more information on oslo.metrics
https://opendev.org/openstack/oslo.metrics

Change-Id: Idf8cc0e52ced1f697ac4048655eff4c956fd5c79
This commit is contained in:
Ching Kuo 2020-11-09 10:17:20 +08:00
parent 5aa645b38b
commit c86492c1b8
6 changed files with 352 additions and 17 deletions

View File

@ -67,6 +67,10 @@ class ConfFixture(fixtures.Fixture):
'oslo_messaging.notify.notifier',
'_notifier_opts',
'oslo_messaging_notifications')
_import_opts(self.conf,
'oslo_messaging.metrics.client',
'oslo_messaging_metrics',
'oslo_messaging_metrics')
if transport_url is not None:
self.transport_url = transport_url

View File

@ -0,0 +1,19 @@
# Copyright 2020 LINE Corp.
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'MetricsCollectorClient',
'get_metrics_collector',
]
from .client import *

View File

@ -0,0 +1,256 @@
# Copyright 2020 LINE Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import queue
import socket
import threading
import time
from oslo_config import cfg
from oslo_log import log as logging
from oslo_metrics import message_type
from oslo_utils import eventletutils
from oslo_utils import importutils
LOG = logging.getLogger(__name__)
eventlet = importutils.try_import('eventlet')
if eventlet and eventletutils.is_monkey_patched("thread"):
# Here we initialize module with the native python threading module
# if it was already monkey patched by eventlet/greenlet.
stdlib_threading = eventlet.patcher.original('threading')
else:
# Manage the case where we run this driver in a non patched environment
# and where user even so configure the driver to run heartbeat through
# a python thread, if we don't do that when the heartbeat will start
# we will facing an issue by trying to override the threading module.
stdlib_threading = threading
oslo_messaging_metrics = [
cfg.BoolOpt('metrics_enabled', default=False,
help='Boolean to send rpc metrics to oslo.metrics.'),
cfg.IntOpt('metrics_buffer_size', default=1000,
help='Buffer size to store in oslo.messaging.'),
cfg.StrOpt('metrics_socket_file',
default='/var/tmp/metrics_collector.sock',
help='Unix domain socket file to be used'
'to send rpc related metrics'),
cfg.StrOpt('metrics_process_name',
default='',
help='Process name which is used to identify which process'
'produce metrics'),
cfg.IntOpt('metrics_thread_stop_timeout',
default=10,
help='Sending thread stop once metrics_thread_stop_timeout'
'seconds after the last successful metrics send.'
'So that this thread will not be the blocker'
'when process is shutting down.'
'If the process is still running, sending thread will'
'be restarted at the next metrics queueing time')
]
cfg.CONF.register_opts(oslo_messaging_metrics, group='oslo_messaging_metrics')
class MetricsCollectorClient(object):
def __init__(self, conf, metrics_type, **kwargs):
self.conf = conf
self.unix_socket = self.conf.oslo_messaging_metrics.metrics_socket_file
buffer_size = self.conf.oslo_messaging_metrics.metrics_buffer_size
self.tx_queue = queue.Queue(buffer_size)
self.next_send_metric = None
self.metrics_type = metrics_type
self.args = kwargs
self.send_thread = threading.Thread(target=self.send_loop)
self.send_thread.start()
def __enter__(self):
if not self.conf.oslo_messaging_metrics.metrics_enabled:
return None
self.start_time = time.time()
send_method = getattr(self, self.metrics_type +
"_invocation_start_total")
send_method(**self.args)
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.conf.oslo_messaging_metrics.metrics_enabled:
duration = time.time() - self.start_time
send_method = getattr(
self, self.metrics_type + "_processing_seconds")
send_method(duration=duration, **self.args)
send_method = getattr(
self, self.metrics_type + "_invocation_end_total")
send_method(**self.args)
def put_into_txqueue(self, metrics_name, action, **labels):
labels['process'] = \
cfg.CONF.oslo_messaging_metrics.metrics_process_name
m = message_type.Metric("oslo_messaging", metrics_name, action,
**labels)
try:
self.tx_queue.put_nowait(m)
except queue.Full:
LOG.warning("tx queues is already full(%s/%s). Fails to "
"send the metrics(%s)" %
(self.tx_queue.qsize(), self.tx_queue.maxsize, m))
if not self.send_thread.is_alive():
self.send_thread = threading.Thread(target=self.send_loop)
self.send_thread.start()
def send_loop(self):
timeout = cfg.CONF.oslo_messaging_metrics.metrics_thread_stop_timeout
stoptime = time.time() + timeout
while stoptime > time.time():
if self.next_send_metric is None:
try:
self.next_send_metric = self.tx_queue.get(timeout=timeout)
except queue.Empty:
continue
try:
self.send_metric(self.next_send_metric)
self.next_send_metric = None
stoptime = time.time() + timeout
except Exception as e:
LOG.error("Failed to send metrics: %s. "
"Wait 1 seconds for next try." % e)
time.sleep(1)
def send_metric(self, metric):
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect(self.unix_socket)
s.send(metric.to_json().encode())
s.close()
def put_rpc_client_metrics_to_txqueue(self, metric_name, action,
target, method, call_type, timeout,
exception=None):
kwargs = {
'call_type': call_type,
'exchange': target.exchange,
'topic': target.topic,
'namespace': target.namespace,
'version': target.version,
'server': target.server,
'fanout': target.fanout,
'method': method,
'timeout': timeout,
}
if exception:
kwargs['exception'] = exception
self.put_into_txqueue(metric_name, action, **kwargs)
def rpc_client_invocation_start_total(self, target, method, call_type,
timeout=None):
self.put_rpc_client_metrics_to_txqueue(
"rpc_client_invocation_start_total",
message_type.MetricAction("inc", None),
target, method, call_type, timeout
)
def rpc_client_invocation_end_total(self, target, method, call_type,
timeout=None):
self.put_rpc_client_metrics_to_txqueue(
"rpc_client_invocation_end_total",
message_type.MetricAction("inc", None),
target, method, call_type, timeout
)
def rpc_client_processing_seconds(self, target, method, call_type,
duration, timeout=None):
self.put_rpc_client_metrics_to_txqueue(
"rpc_client_processing_seconds",
message_type.MetricAction("observe", duration),
target, method, call_type, timeout
)
def rpc_client_exception_total(self, target, method, call_type, exception,
timeout=None):
self.put_rpc_client_metrics_to_txqueue(
"rpc_client_exception_total",
message_type.MetricAction("inc", None),
target, method, call_type, timeout, exception
)
def put_rpc_server_metrics_to_txqueue(self, metric_name, action,
target, endpoint, ns, ver, method,
exception=None):
kwargs = {
'endpoint': endpoint,
'namespace': ns,
'version': ver,
'method': method,
'exchange': None,
'topic': None,
'server': None
}
if target:
kwargs['exchange'] = target.exchange
kwargs['topic'] = target.topic
kwargs['server'] = target.server
if exception:
kwargs['exception'] = exception
self.put_into_txqueue(metric_name, action, **kwargs)
def rpc_server_invocation_start_total(self, target, endpoint,
ns, ver, method):
self.put_rpc_server_metrics_to_txqueue(
"rpc_server_invocation_start_total",
message_type.MetricAction("inc", None),
target, endpoint, ns, ver, method
)
def rpc_server_invocation_end_total(self, target, endpoint,
ns, ver, method):
self.put_rpc_server_metrics_to_txqueue(
"rpc_server_invocation_end_total",
message_type.MetricAction("inc", None),
target, endpoint, ns, ver, method
)
def rpc_server_processing_seconds(self, target, endpoint, ns, ver,
method, duration):
self.put_rpc_server_metrics_to_txqueue(
"rpc_server_processing_seconds",
message_type.MetricAction("observe", duration),
target, endpoint, ns, ver, method
)
def rpc_server_exception_total(self, target, endpoint, ns, ver,
method, exception):
self.put_rpc_server_metrics_to_txqueue(
"rpc_server_exception_total",
message_type.MetricAction("inc", None),
target, endpoint, ns, ver, method, exception=exception
)
METRICS_COLLECTOR = None
def get_collector(conf, metrics_type, **kwargs):
global threading
threading = stdlib_threading
global METRICS_COLLECTOR
if METRICS_COLLECTOR is None:
METRICS_COLLECTOR = MetricsCollectorClient(
conf, metrics_type, **kwargs)
return METRICS_COLLECTOR

View File

@ -23,6 +23,7 @@ from oslo_config import cfg
from oslo_messaging._drivers import base as driver_base
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
from oslo_messaging import metrics
from oslo_messaging import serializer as msg_serializer
from oslo_messaging import transport as msg_transport
@ -146,12 +147,23 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta):
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, msg_ctxt, msg,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
with metrics.get_collector(self.conf, "rpc_client",
target=self.target,
method=method,
call_type="cast") as metrics_collector:
try:
self.transport._send(self.target, msg_ctxt, msg,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
self._metrics_api.rpc_client_exception_total(
self.target, method, "cast", ex.__class__.__name__)
raise ClientSendError(self.target, ex)
except Exception as ex:
if self.conf.oslo_messaging_metrics.metrics_enabled:
metrics_collector.rpc_client_exception_total(
self.target, method, "cast", ex.__class__.__name__)
raise
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
@ -170,17 +182,24 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta):
self._check_version_cap(msg.get('version'))
try:
result = \
self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
return self.serializer.deserialize_entity(ctxt, result)
with metrics.get_collector(self.conf, "rpc_client",
target=self.target, method=method,
call_type="call") as metrics_collector:
try:
result = self.transport._send(
self.target, msg_ctxt, msg, wait_for_reply=True,
timeout=timeout, call_monitor_timeout=cm_timeout,
retry=self.retry, transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
self._metrics_api.rpc_client_exception_total(
self.target, method, "call", ex.__class__.__name__)
raise ClientSendError(self.target, ex)
except Exception as ex:
if self.conf.oslo_messaging_metrics.metrics_enabled:
metrics_collector.rpc_client_exception_total(
self.target, method, "call", ex.__class__.__name__)
raise
return self.serializer.deserialize_entity(ctxt, result)
@abc.abstractmethod
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,

View File

@ -12,6 +12,8 @@
# under the License.
import os
import requests
import subprocess
import time
import uuid
@ -565,3 +567,35 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual('test', event[1])
self.assertEqual('Hello World!', event[2])
self.assertEqual('abc', event[3])
class MetricsTestCase(utils.SkipIfNoTransportURL):
def setUp(self):
super(MetricsTestCase, self).setUp(conf=cfg.ConfigOpts())
if self.rpc_url.startswith("kafka://"):
self.skipTest("kafka does not support RPC API")
self.config(metrics_enabled=True,
group='oslo_messaging_metrics')
def test_functional(self):
# verify call metrics is sent and reflected in oslo.metrics
self.config(metrics_socket_file='/var/tmp/metrics_collector.sock',
group='oslo_messaging_metrics')
metric_server = subprocess.Popen(["python3", "-m", "oslo_metrics"])
time.sleep(1)
group = self.useFixture(
utils.RpcServerGroupFixture(self.conf, self.rpc_url))
client = group.client(1)
client.add(increment=1)
time.sleep(1)
r = requests.get('http://localhost:3000')
for line in r.text.split('\n'):
if 'client_invocation_start_total{' in line:
self.assertEqual('1.0', line[-3:])
elif 'client_invocation_end_total{' in line:
self.assertEqual('1.0', line[-3:])
elif 'client_processing_seconds_count{' in line:
self.assertEqual('1.0', line[-3:])
metric_server.terminate()

View File

@ -28,3 +28,6 @@ kombu>=4.6.6 # BSD
# middleware
oslo.middleware>=3.31.0 # Apache-2.0
# metrics
oslo.metrics>=0.2.1 # Apache-2.0