From 191eaa443ad858511d0476ef103bb5276e4d99d4 Mon Sep 17 00:00:00 2001 From: Alexei Kornienko Date: Fri, 23 Aug 2013 12:39:59 +0300 Subject: [PATCH 3/3] Added RPC trace Change-Id: I4b602abdf766df57217da2c705f05ff0cb6afe55 --- keystone/openstack/common/rpc/__init__.py | 9 +++++++++ keystone/openstack/common/rpc/amqp.py | 10 ++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/keystone/openstack/common/rpc/__init__.py b/keystone/openstack/common/rpc/__init__.py index 248a745..3588ed6 100644 --- a/keystone/openstack/common/rpc/__init__.py +++ b/keystone/openstack/common/rpc/__init__.py @@ -27,6 +27,8 @@ For some wrappers that add message versioning to rpc, see: import inspect +import tomograph + from oslo.config import cfg from keystone.openstack.common.gettextutils import _ # noqa @@ -137,6 +139,7 @@ def call(context, topic, msg, timeout=None, check_for_lock=False): """ if check_for_lock: _check_for_lock() + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().call(CONF, context, topic, msg, timeout) @@ -155,6 +158,7 @@ def cast(context, topic, msg): :returns: None """ + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().cast(CONF, context, topic, msg) @@ -176,6 +180,7 @@ def fanout_cast(context, topic, msg): :returns: None """ + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().fanout_cast(CONF, context, topic, msg) @@ -210,6 +215,7 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False): """ if check_for_lock: _check_for_lock() + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().multicall(CONF, context, topic, msg, timeout) @@ -224,6 +230,7 @@ def notify(context, topic, msg, envelope=False): :returns: None """ + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) @@ -252,6 +259,7 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().cast_to_server(CONF, context, server_params, topic, msg) @@ -268,6 +276,7 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ + msg['trace_info'] = tomograph.get_trace_info() return _get_impl().fanout_cast_to_server(CONF, context, server_params, topic, msg) diff --git a/keystone/openstack/common/rpc/amqp.py b/keystone/openstack/common/rpc/amqp.py index 3bcedbd..d7ae97a 100644 --- a/keystone/openstack/common/rpc/amqp.py +++ b/keystone/openstack/common/rpc/amqp.py @@ -28,6 +28,8 @@ AMQP, but is deprecated and predates this code. import collections import inspect import sys +import socket +import tomograph import uuid from eventlet import greenpool @@ -417,15 +419,16 @@ class ProxyCallback(_ThreadPoolWithWait): args = message_data.get('args', {}) version = message_data.get('version') namespace = message_data.get('namespace') + trace_info = message_data.get('trace_info', None) if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return self.pool.spawn_n(self._process_data, ctxt, version, method, - namespace, args) + namespace, args, trace_info) - def _process_data(self, ctxt, version, method, namespace, args): + def _process_data(self, ctxt, version, method, namespace, args, trace_info): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -434,6 +437,7 @@ class ProxyCallback(_ThreadPoolWithWait): the old behavior of magically calling the specified method on the proxy we have here. """ + tomograph.start("rpc", str(method), socket.gethostname(), 0, trace_info) ctxt.update_store() try: rval = self.proxy.dispatch(ctxt, version, method, namespace, @@ -458,6 +462,8 @@ class ProxyCallback(_ThreadPoolWithWait): LOG.error(_('Exception during message handling'), exc_info=exc_info) ctxt.reply(None, exc_info, connection_pool=self.connection_pool) + finally: + tomograph.stop(str(method)) class MulticallProxyWaiter(object): -- 1.8.1.2