Remove some profiler traces, logs, use utils.cut() where needed

* At some places like engine_server.py profiler traces aren't
  really needed because its methods are wrappers for the engine.
  Tracing the engine is enough.
* Added "hide_args=True" for those profiler traces where we
  potentially have big data passed in function/method arguments.
  With this option, profiler never builds string representations
  for them which saves lots of CPU.
* Removed obsolege @u.log_exec(LOG) decorator from engine.
  In fact, it's a less advanced verion of @profiler.trace() so
  it's not needed anymore.
* Use utils.cut() where we can potentially have big string
  representation. Mostly in log statements. For example, building
  a string representation for a dict with 20000 short keys and
  values takes 30-40 ms on MacBook Pro. If we have many statements
  like this it becomes noticeable. Up to a half a second.

Change-Id: I409c5347ec92834b5ad61e29e00e099a4744dc58
This commit is contained in:
Renat Akhmerov 2017-02-01 15:02:13 +07:00
parent 4b6cca49b3
commit f16f4a3f3e
10 changed files with 49 additions and 42 deletions

View File

@ -27,7 +27,7 @@ from mistral.workbook import parser as spec_parser
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@profiler.trace('action-handler-on-action-complete') @profiler.trace('action-handler-on-action-complete', hide_args=True)
def on_action_complete(action_ex, result): def on_action_complete(action_ex, result):
task_ex = action_ex.task_execution task_ex = action_ex.task_execution
@ -54,7 +54,7 @@ def on_action_complete(action_ex, result):
task_handler.schedule_on_action_complete(action_ex) task_handler.schedule_on_action_complete(action_ex)
@profiler.trace('action-handler-build-action') @profiler.trace('action-handler-build-action', hide_args=True)
def _build_action(action_ex): def _build_action(action_ex):
if isinstance(action_ex, models.WorkflowExecution): if isinstance(action_ex, models.WorkflowExecution):
return actions.WorkflowAction(None, action_ex=action_ex) return actions.WorkflowAction(None, action_ex=action_ex)

View File

@ -159,7 +159,7 @@ class Action(object):
self.action_def.action_class, self.action_def.attributes): self.action_def.action_class, self.action_def.attributes):
input_dict.update(a_m.get_empty_action_context()) input_dict.update(a_m.get_empty_action_context())
@profiler.trace('action-log-result') @profiler.trace('action-log-result', hide_args=True)
def _log_result(self, prev_state, result): def _log_result(self, prev_state, result):
state = self.action_ex.state state = self.action_ex.state
@ -185,7 +185,7 @@ class Action(object):
class PythonAction(Action): class PythonAction(Action):
"""Regular Python action.""" """Regular Python action."""
@profiler.trace('action-complete') @profiler.trace('action-complete', hide_args=True)
def complete(self, result): def complete(self, result):
assert self.action_ex assert self.action_ex
@ -206,7 +206,7 @@ class PythonAction(Action):
self._log_result(prev_state, result) self._log_result(prev_state, result)
@profiler.trace('action-schedule') @profiler.trace('action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
assert not self.action_ex assert not self.action_ex
@ -227,7 +227,7 @@ class PythonAction(Action):
action_queue.schedule(self.action_ex, self.action_def, target) action_queue.schedule(self.action_ex, self.action_def, target)
@profiler.trace('action-run') @profiler.trace('action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True, def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=False): safe_rerun=False):
assert not self.action_ex assert not self.action_ex
@ -436,12 +436,12 @@ class AdHocAction(PythonAction):
class WorkflowAction(Action): class WorkflowAction(Action):
"""Workflow action.""" """Workflow action."""
@profiler.trace('action-complete') @profiler.trace('workflow-action-complete', hide_args=True)
def complete(self, result): def complete(self, result):
# No-op because in case of workflow result is already processed. # No-op because in case of workflow result is already processed.
pass pass
@profiler.trace('action-schedule') @profiler.trace('workflkow-action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
assert not self.action_ex assert not self.action_ex
@ -485,7 +485,7 @@ class WorkflowAction(Action):
wf_params wf_params
) )
@profiler.trace('action-run') @profiler.trace('workflow-action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True, def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=True): safe_rerun=True):
raise NotImplementedError('Does not apply to this WorkflowAction.') raise NotImplementedError('Does not apply to this WorkflowAction.')

View File

@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from oslo_log import log as logging
from osprofiler import profiler from osprofiler import profiler
from mistral.db import utils as db_utils from mistral.db import utils as db_utils
@ -28,8 +27,6 @@ from mistral import exceptions
from mistral import utils as u from mistral import utils as u
from mistral.workflow import states from mistral.workflow import states
LOG = logging.getLogger(__name__)
# Submodules of mistral.engine will throw NoSuchOptError if configuration # Submodules of mistral.engine will throw NoSuchOptError if configuration
# options required at top level of this __init__.py are not imported before # options required at top level of this __init__.py are not imported before
@ -38,7 +35,6 @@ LOG = logging.getLogger(__name__)
class DefaultEngine(base.Engine): class DefaultEngine(base.Engine):
@action_queue.process @action_queue.process
@u.log_exec(LOG)
@profiler.trace('engine-start-workflow') @profiler.trace('engine-start-workflow')
def start_workflow(self, wf_identifier, wf_input, description='', def start_workflow(self, wf_identifier, wf_input, description='',
**params): **params):
@ -54,7 +50,6 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone() return wf_ex.get_clone()
@action_queue.process @action_queue.process
@u.log_exec(LOG)
def start_action(self, action_name, action_input, def start_action(self, action_name, action_input,
description=None, **params): description=None, **params):
with db_api.transaction(): with db_api.transaction():
@ -107,8 +102,7 @@ class DefaultEngine(base.Engine):
@db_utils.retry_on_deadlock @db_utils.retry_on_deadlock
@action_queue.process @action_queue.process
@u.log_exec(LOG) @profiler.trace('engine-on-action-complete', hide_args=True)
@profiler.trace('engine-on-action-complete')
def on_action_complete(self, action_ex_id, result, wf_action=False, def on_action_complete(self, action_ex_id, result, wf_action=False,
async=False): async=False):
with db_api.transaction(): with db_api.transaction():
@ -121,7 +115,6 @@ class DefaultEngine(base.Engine):
return action_ex.get_clone() return action_ex.get_clone()
@u.log_exec(LOG)
def pause_workflow(self, wf_ex_id): def pause_workflow(self, wf_ex_id):
with db_api.transaction(): with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id) wf_ex = db_api.get_workflow_execution(wf_ex_id)
@ -131,7 +124,6 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone() return wf_ex.get_clone()
@action_queue.process @action_queue.process
@u.log_exec(LOG)
def rerun_workflow(self, task_ex_id, reset=True, env=None): def rerun_workflow(self, task_ex_id, reset=True, env=None):
with db_api.transaction(): with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id) task_ex = db_api.get_task_execution(task_ex_id)
@ -143,7 +135,6 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone() return wf_ex.get_clone()
@action_queue.process @action_queue.process
@u.log_exec(LOG)
def resume_workflow(self, wf_ex_id, env=None): def resume_workflow(self, wf_ex_id, env=None):
with db_api.transaction(): with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id) wf_ex = db_api.get_workflow_execution(wf_ex_id)
@ -152,7 +143,6 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone() return wf_ex.get_clone()
@u.log_exec(LOG)
def stop_workflow(self, wf_ex_id, state, message=None): def stop_workflow(self, wf_ex_id, state, message=None):
with db_api.transaction(): with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id) wf_ex = db_api.get_workflow_execution(wf_ex_id)
@ -161,7 +151,6 @@ class DefaultEngine(base.Engine):
return wf_ex.get_clone() return wf_ex.get_clone()
@u.log_exec(LOG)
def rollback_workflow(self, wf_ex_id): def rollback_workflow(self, wf_ex_id):
# TODO(rakhmerov): Implement. # TODO(rakhmerov): Implement.
raise NotImplementedError raise NotImplementedError

View File

@ -31,7 +31,7 @@ class DefaultExecutor(base.Executor):
def __init__(self): def __init__(self):
self._engine_client = rpc.get_engine_client() self._engine_client = rpc.get_engine_client()
@profiler.trace('executor-run-action') @profiler.trace('executor-run-action', hide_args=True)
def run_action(self, action_ex_id, action_class_str, attributes, def run_action(self, action_ex_id, action_class_str, attributes,
action_params, safe_rerun, redelivered=False): action_params, safe_rerun, redelivered=False):
"""Runs action. """Runs action.

View File

@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
from oslo_log import log as logging from oslo_log import log as logging
from osprofiler import profiler
from mistral import config as cfg from mistral import config as cfg
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
@ -22,6 +21,7 @@ from mistral.engine.rpc_backend import rpc
from mistral.service import base as service_base from mistral.service import base as service_base
from mistral.services import expiration_policy from mistral.services import expiration_policy
from mistral.services import scheduler from mistral.services import scheduler
from mistral import utils
from mistral.utils import profiler as profiler_utils from mistral.utils import profiler as profiler_utils
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
@ -94,9 +94,9 @@ class EngineServer(service_base.MistralService):
LOG.info( LOG.info(
"Received RPC request 'start_workflow'[rpc_ctx=%s," "Received RPC request 'start_workflow'[rpc_ctx=%s,"
" workflow_identifier=%s, workflow_input=%s, description=%s, " " workflow_identifier=%s, workflow_input=%s, description=%s, "
"params=%s]" "params=%s]" %
% (rpc_ctx, workflow_identifier, workflow_input, description, (rpc_ctx, workflow_identifier, utils.cut(workflow_input),
params) description, params)
) )
return self.engine.start_workflow( return self.engine.start_workflow(
@ -120,7 +120,8 @@ class EngineServer(service_base.MistralService):
LOG.info( LOG.info(
"Received RPC request 'start_action'[rpc_ctx=%s," "Received RPC request 'start_action'[rpc_ctx=%s,"
" name=%s, input=%s, description=%s, params=%s]" " name=%s, input=%s, description=%s, params=%s]"
% (rpc_ctx, action_name, action_input, description, params) % (rpc_ctx, action_name, utils.cut(action_input),
description, params)
) )
return self.engine.start_action( return self.engine.start_action(
@ -130,7 +131,6 @@ class EngineServer(service_base.MistralService):
**params **params
) )
@profiler.trace('engine-server-on-action-complete')
def on_action_complete(self, rpc_ctx, action_ex_id, result_data, def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
result_error, wf_action): result_error, wf_action):
"""Receives RPC calls to communicate action result to engine. """Receives RPC calls to communicate action result to engine.
@ -147,7 +147,8 @@ class EngineServer(service_base.MistralService):
LOG.info( LOG.info(
"Received RPC request 'on_action_complete'[rpc_ctx=%s," "Received RPC request 'on_action_complete'[rpc_ctx=%s,"
" action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result) " action_ex_id=%s, result=%s]" %
(rpc_ctx, action_ex_id, result.cut_repr())
) )
return self.engine.on_action_complete(action_ex_id, result, wf_action) return self.engine.on_action_complete(action_ex_id, result, wf_action)
@ -218,7 +219,8 @@ class EngineServer(service_base.MistralService):
LOG.info( LOG.info(
"Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s," "Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s,"
" state=%s, message=%s]" % (rpc_ctx, execution_id, state, message) " state=%s, message=%s]" %
(rpc_ctx, execution_id, state, message)
) )
return self.engine.stop_workflow(execution_id, state, message) return self.engine.stop_workflow(execution_id, state, message)

View File

@ -18,6 +18,7 @@ from mistral import config as cfg
from mistral.engine import default_executor from mistral.engine import default_executor
from mistral.engine.rpc_backend import rpc from mistral.engine.rpc_backend import rpc
from mistral.service import base as service_base from mistral.service import base as service_base
from mistral import utils
from mistral.utils import profiler as profiler_utils from mistral.utils import profiler as profiler_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -74,7 +75,8 @@ class ExecutorServer(service_base.MistralService):
LOG.info( LOG.info(
"Received RPC request 'run_action'[rpc_ctx=%s," "Received RPC request 'run_action'[rpc_ctx=%s,"
" action_ex_id=%s, action_class=%s, attributes=%s, params=%s]" " action_ex_id=%s, action_class=%s, attributes=%s, params=%s]"
% (rpc_ctx, action_ex_id, action_class_str, attributes, params) % (rpc_ctx, action_ex_id, action_class_str, attributes,
utils.cut(params))
) )
redelivered = rpc_ctx.redelivered or False redelivered = rpc_ctx.redelivered or False

View File

@ -193,7 +193,7 @@ class EngineClient(base.Engine):
) )
@wrap_messaging_exception @wrap_messaging_exception
@profiler.trace('engine-client-on-action-complete') @profiler.trace('engine-client-on-action-complete', hide_args=True)
def on_action_complete(self, action_ex_id, result, wf_action=False, def on_action_complete(self, action_ex_id, result, wf_action=False,
async=False): async=False):
"""Conveys action result to Mistral Engine. """Conveys action result to Mistral Engine.
@ -331,6 +331,7 @@ class ExecutorClient(base.Executor):
self.topic = cfg.CONF.executor.topic self.topic = cfg.CONF.executor.topic
self._client = get_rpc_client_driver()(rpc_conf_dict) self._client = get_rpc_client_driver()(rpc_conf_dict)
@profiler.trace('executor-client-run-action')
def run_action(self, action_ex_id, action_class_str, attributes, def run_action(self, action_ex_id, action_class_str, attributes,
action_params, target=None, async=True, safe_rerun=False): action_params, target=None, async=True, safe_rerun=False):
"""Sends a request to run action to executor. """Sends a request to run action to executor.

View File

@ -45,7 +45,7 @@ _SCHEDULED_ON_ACTION_COMPLETE_PATH = (
) )
@profiler.trace('task-handler-run-task') @profiler.trace('task-handler-run-task', hide_args=True)
def run_task(wf_cmd): def run_task(wf_cmd):
"""Runs workflow task. """Runs workflow task.
@ -77,7 +77,7 @@ def run_task(wf_cmd):
_schedule_refresh_task_state(task.task_ex, 1) _schedule_refresh_task_state(task.task_ex, 1)
@profiler.trace('task-handler-on-action-complete') @profiler.trace('task-handler-on-action-complete', hide_args=True)
def _on_action_complete(action_ex): def _on_action_complete(action_ex):
"""Handles action completion event. """Handles action completion event.
@ -207,7 +207,7 @@ def _build_task_from_execution(wf_spec, task_ex):
) )
@profiler.trace('task-handler-build-task-from-command') @profiler.trace('task-handler-build-task-from-command', hide_args=True)
def _build_task_from_command(cmd): def _build_task_from_command(cmd):
if isinstance(cmd, wf_cmds.RunExistingTask): if isinstance(cmd, wf_cmds.RunExistingTask):
task = _create_task( task = _create_task(
@ -251,7 +251,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
@action_queue.process @action_queue.process
@profiler.trace('task-handler-refresh-task-state') @profiler.trace('task-handler-refresh-task-state', hide_args=True)
def _refresh_task_state(task_ex_id): def _refresh_task_state(task_ex_id):
with db_api.transaction(): with db_api.transaction():
task_ex = db_api.load_task_execution(task_ex_id) task_ex = db_api.load_task_execution(task_ex_id)

View File

@ -43,7 +43,6 @@ class Task(object):
Mistral engine or its components in order to manipulate with tasks. Mistral engine or its components in order to manipulate with tasks.
""" """
@profiler.trace('task-create')
def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None, def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None,
unique_key=None, waiting=False): unique_key=None, waiting=False):
self.wf_ex = wf_ex self.wf_ex = wf_ex
@ -253,7 +252,7 @@ class RegularTask(Task):
Takes care of processing regular tasks with one action. Takes care of processing regular tasks with one action.
""" """
@profiler.trace('regular-task-on-action-complete') @profiler.trace('regular-task-on-action-complete', hide_args=True)
def on_action_complete(self, action_ex): def on_action_complete(self, action_ex):
state = action_ex.state state = action_ex.state
# TODO(rakhmerov): Here we can define more informative messages # TODO(rakhmerov): Here we can define more informative messages
@ -352,7 +351,7 @@ class RegularTask(Task):
safe_rerun=self.task_spec.get_safe_rerun() safe_rerun=self.task_spec.get_safe_rerun()
) )
@profiler.trace('regular-task-get-target') @profiler.trace('regular-task-get-target', hide_args=True)
def _get_target(self, input_dict): def _get_target(self, input_dict):
ctx_view = data_flow.ContextView( ctx_view = data_flow.ContextView(
input_dict, input_dict,
@ -366,7 +365,7 @@ class RegularTask(Task):
ctx_view ctx_view
) )
@profiler.trace('regular-task-get-action-input') @profiler.trace('regular-task-get-action-input', hide_args=True)
def _get_action_input(self, ctx=None): def _get_action_input(self, ctx=None):
ctx = ctx or self.ctx ctx = ctx or self.ctx
@ -426,7 +425,7 @@ class WithItemsTask(RegularTask):
_CAPACITY: 0 _CAPACITY: 0
} }
@profiler.trace('with-items-task-on-action-complete') @profiler.trace('with-items-task-on-action-complete', hide_args=True)
def on_action_complete(self, action_ex): def on_action_complete(self, action_ex):
assert self.task_ex assert self.task_ex

View File

@ -14,7 +14,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from mistral import utils
from mistral.utils import serializers from mistral.utils import serializers
from osprofiler import profiler
class Result(object): class Result(object):
@ -26,9 +28,21 @@ class Result(object):
self.cancel = cancel self.cancel = cancel
def __repr__(self): def __repr__(self):
try:
profiler.start("action-result-repr")
import traceback
traceback.print_stack()
return 'Result [data=%s, error=%s, cancel=%s]' % ( return 'Result [data=%s, error=%s, cancel=%s]' % (
repr(self.data), repr(self.error), str(self.cancel) repr(self.data), repr(self.error), str(self.cancel)
) )
finally:
profiler.stop()
def cut_repr(self):
return 'Result [data=%s, error=%s, cancel=%s]' % (
utils.cut(self.data), utils.cut(self.error), str(self.cancel)
)
def is_cancel(self): def is_cancel(self):
return self.cancel return self.cancel