Merge "Remove some profiler traces, logs, use utils.cut() where needed"
This commit is contained in:
commit
b635dcc79e
@ -27,7 +27,7 @@ from mistral.workbook import parser as spec_parser
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@profiler.trace('action-handler-on-action-complete')
|
||||
@profiler.trace('action-handler-on-action-complete', hide_args=True)
|
||||
def on_action_complete(action_ex, result):
|
||||
task_ex = action_ex.task_execution
|
||||
|
||||
@ -54,7 +54,7 @@ def on_action_complete(action_ex, result):
|
||||
task_handler.schedule_on_action_complete(action_ex)
|
||||
|
||||
|
||||
@profiler.trace('action-handler-build-action')
|
||||
@profiler.trace('action-handler-build-action', hide_args=True)
|
||||
def _build_action(action_ex):
|
||||
if isinstance(action_ex, models.WorkflowExecution):
|
||||
return actions.WorkflowAction(None, action_ex=action_ex)
|
||||
|
@ -159,7 +159,7 @@ class Action(object):
|
||||
self.action_def.action_class, self.action_def.attributes):
|
||||
input_dict.update(a_m.get_empty_action_context())
|
||||
|
||||
@profiler.trace('action-log-result')
|
||||
@profiler.trace('action-log-result', hide_args=True)
|
||||
def _log_result(self, prev_state, result):
|
||||
state = self.action_ex.state
|
||||
|
||||
@ -185,7 +185,7 @@ class Action(object):
|
||||
class PythonAction(Action):
|
||||
"""Regular Python action."""
|
||||
|
||||
@profiler.trace('action-complete')
|
||||
@profiler.trace('action-complete', hide_args=True)
|
||||
def complete(self, result):
|
||||
assert self.action_ex
|
||||
|
||||
@ -206,7 +206,7 @@ class PythonAction(Action):
|
||||
|
||||
self._log_result(prev_state, result)
|
||||
|
||||
@profiler.trace('action-schedule')
|
||||
@profiler.trace('action-schedule', hide_args=True)
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
assert not self.action_ex
|
||||
|
||||
@ -227,7 +227,7 @@ class PythonAction(Action):
|
||||
|
||||
action_queue.schedule(self.action_ex, self.action_def, target)
|
||||
|
||||
@profiler.trace('action-run')
|
||||
@profiler.trace('action-run', hide_args=True)
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=False):
|
||||
assert not self.action_ex
|
||||
@ -436,12 +436,12 @@ class AdHocAction(PythonAction):
|
||||
class WorkflowAction(Action):
|
||||
"""Workflow action."""
|
||||
|
||||
@profiler.trace('action-complete')
|
||||
@profiler.trace('workflow-action-complete', hide_args=True)
|
||||
def complete(self, result):
|
||||
# No-op because in case of workflow result is already processed.
|
||||
pass
|
||||
|
||||
@profiler.trace('action-schedule')
|
||||
@profiler.trace('workflkow-action-schedule', hide_args=True)
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
assert not self.action_ex
|
||||
|
||||
@ -485,7 +485,7 @@ class WorkflowAction(Action):
|
||||
wf_params
|
||||
)
|
||||
|
||||
@profiler.trace('action-run')
|
||||
@profiler.trace('workflow-action-run', hide_args=True)
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=True):
|
||||
raise NotImplementedError('Does not apply to this WorkflowAction.')
|
||||
|
@ -14,7 +14,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
@ -28,8 +27,6 @@ from mistral import exceptions
|
||||
from mistral import utils as u
|
||||
from mistral.workflow import states
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Submodules of mistral.engine will throw NoSuchOptError if configuration
|
||||
# options required at top level of this __init__.py are not imported before
|
||||
@ -38,7 +35,6 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class DefaultEngine(base.Engine):
|
||||
@action_queue.process
|
||||
@u.log_exec(LOG)
|
||||
@profiler.trace('engine-start-workflow')
|
||||
def start_workflow(self, wf_identifier, wf_input, description='',
|
||||
**params):
|
||||
@ -54,7 +50,6 @@ class DefaultEngine(base.Engine):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@action_queue.process
|
||||
@u.log_exec(LOG)
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
with db_api.transaction():
|
||||
@ -107,8 +102,7 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
@u.log_exec(LOG)
|
||||
@profiler.trace('engine-on-action-complete')
|
||||
@profiler.trace('engine-on-action-complete', hide_args=True)
|
||||
def on_action_complete(self, action_ex_id, result, wf_action=False,
|
||||
async_=False):
|
||||
with db_api.transaction():
|
||||
@ -121,7 +115,6 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
return action_ex.get_clone()
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def pause_workflow(self, wf_ex_id):
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
@ -131,7 +124,6 @@ class DefaultEngine(base.Engine):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@action_queue.process
|
||||
@u.log_exec(LOG)
|
||||
def rerun_workflow(self, task_ex_id, reset=True, env=None):
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
@ -143,7 +135,6 @@ class DefaultEngine(base.Engine):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@action_queue.process
|
||||
@u.log_exec(LOG)
|
||||
def resume_workflow(self, wf_ex_id, env=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
@ -152,7 +143,6 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def stop_workflow(self, wf_ex_id, state, message=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
@ -161,7 +151,6 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def rollback_workflow(self, wf_ex_id):
|
||||
# TODO(rakhmerov): Implement.
|
||||
raise NotImplementedError
|
||||
|
@ -31,7 +31,7 @@ class DefaultExecutor(base.Executor):
|
||||
def __init__(self):
|
||||
self._engine_client = rpc.get_engine_client()
|
||||
|
||||
@profiler.trace('executor-run-action')
|
||||
@profiler.trace('executor-run-action', hide_args=True)
|
||||
def run_action(self, action_ex_id, action_class_str, attributes,
|
||||
action_params, safe_rerun, redelivered=False):
|
||||
"""Runs action.
|
||||
|
@ -13,7 +13,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
||||
from mistral import config as cfg
|
||||
from mistral.db.v2 import api as db_api
|
||||
@ -22,6 +21,7 @@ from mistral.engine.rpc_backend import rpc
|
||||
from mistral.service import base as service_base
|
||||
from mistral.services import expiration_policy
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
from mistral.utils import profiler as profiler_utils
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
@ -94,9 +94,9 @@ class EngineServer(service_base.MistralService):
|
||||
LOG.info(
|
||||
"Received RPC request 'start_workflow'[rpc_ctx=%s,"
|
||||
" workflow_identifier=%s, workflow_input=%s, description=%s, "
|
||||
"params=%s]"
|
||||
% (rpc_ctx, workflow_identifier, workflow_input, description,
|
||||
params)
|
||||
"params=%s]" %
|
||||
(rpc_ctx, workflow_identifier, utils.cut(workflow_input),
|
||||
description, params)
|
||||
)
|
||||
|
||||
return self.engine.start_workflow(
|
||||
@ -120,7 +120,8 @@ class EngineServer(service_base.MistralService):
|
||||
LOG.info(
|
||||
"Received RPC request 'start_action'[rpc_ctx=%s,"
|
||||
" name=%s, input=%s, description=%s, params=%s]"
|
||||
% (rpc_ctx, action_name, action_input, description, params)
|
||||
% (rpc_ctx, action_name, utils.cut(action_input),
|
||||
description, params)
|
||||
)
|
||||
|
||||
return self.engine.start_action(
|
||||
@ -130,7 +131,6 @@ class EngineServer(service_base.MistralService):
|
||||
**params
|
||||
)
|
||||
|
||||
@profiler.trace('engine-server-on-action-complete')
|
||||
def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
|
||||
result_error, wf_action):
|
||||
"""Receives RPC calls to communicate action result to engine.
|
||||
@ -147,7 +147,8 @@ class EngineServer(service_base.MistralService):
|
||||
|
||||
LOG.info(
|
||||
"Received RPC request 'on_action_complete'[rpc_ctx=%s,"
|
||||
" action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result)
|
||||
" action_ex_id=%s, result=%s]" %
|
||||
(rpc_ctx, action_ex_id, result.cut_repr())
|
||||
)
|
||||
|
||||
return self.engine.on_action_complete(action_ex_id, result, wf_action)
|
||||
@ -218,7 +219,8 @@ class EngineServer(service_base.MistralService):
|
||||
|
||||
LOG.info(
|
||||
"Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s,"
|
||||
" state=%s, message=%s]" % (rpc_ctx, execution_id, state, message)
|
||||
" state=%s, message=%s]" %
|
||||
(rpc_ctx, execution_id, state, message)
|
||||
)
|
||||
|
||||
return self.engine.stop_workflow(execution_id, state, message)
|
||||
|
@ -18,6 +18,7 @@ from mistral import config as cfg
|
||||
from mistral.engine import default_executor
|
||||
from mistral.engine.rpc_backend import rpc
|
||||
from mistral.service import base as service_base
|
||||
from mistral import utils
|
||||
from mistral.utils import profiler as profiler_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -74,7 +75,8 @@ class ExecutorServer(service_base.MistralService):
|
||||
LOG.info(
|
||||
"Received RPC request 'run_action'[rpc_ctx=%s,"
|
||||
" action_ex_id=%s, action_class=%s, attributes=%s, params=%s]"
|
||||
% (rpc_ctx, action_ex_id, action_class_str, attributes, params)
|
||||
% (rpc_ctx, action_ex_id, action_class_str, attributes,
|
||||
utils.cut(params))
|
||||
)
|
||||
|
||||
redelivered = rpc_ctx.redelivered or False
|
||||
|
@ -193,7 +193,7 @@ class EngineClient(base.Engine):
|
||||
)
|
||||
|
||||
@wrap_messaging_exception
|
||||
@profiler.trace('engine-client-on-action-complete')
|
||||
@profiler.trace('engine-client-on-action-complete', hide_args=True)
|
||||
def on_action_complete(self, action_ex_id, result, wf_action=False,
|
||||
async_=False):
|
||||
"""Conveys action result to Mistral Engine.
|
||||
@ -331,6 +331,7 @@ class ExecutorClient(base.Executor):
|
||||
self.topic = cfg.CONF.executor.topic
|
||||
self._client = get_rpc_client_driver()(rpc_conf_dict)
|
||||
|
||||
@profiler.trace('executor-client-run-action')
|
||||
def run_action(self, action_ex_id, action_class_str, attributes,
|
||||
action_params, target=None, async_=True, safe_rerun=False):
|
||||
"""Sends a request to run action to executor.
|
||||
|
@ -45,7 +45,7 @@ _SCHEDULED_ON_ACTION_COMPLETE_PATH = (
|
||||
)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-run-task')
|
||||
@profiler.trace('task-handler-run-task', hide_args=True)
|
||||
def run_task(wf_cmd):
|
||||
"""Runs workflow task.
|
||||
|
||||
@ -77,7 +77,7 @@ def run_task(wf_cmd):
|
||||
_schedule_refresh_task_state(task.task_ex, 1)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-on-action-complete')
|
||||
@profiler.trace('task-handler-on-action-complete', hide_args=True)
|
||||
def _on_action_complete(action_ex):
|
||||
"""Handles action completion event.
|
||||
|
||||
@ -207,7 +207,7 @@ def _build_task_from_execution(wf_spec, task_ex):
|
||||
)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-build-task-from-command')
|
||||
@profiler.trace('task-handler-build-task-from-command', hide_args=True)
|
||||
def _build_task_from_command(cmd):
|
||||
if isinstance(cmd, wf_cmds.RunExistingTask):
|
||||
task = _create_task(
|
||||
@ -251,7 +251,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
|
||||
|
||||
|
||||
@action_queue.process
|
||||
@profiler.trace('task-handler-refresh-task-state')
|
||||
@profiler.trace('task-handler-refresh-task-state', hide_args=True)
|
||||
def _refresh_task_state(task_ex_id):
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.load_task_execution(task_ex_id)
|
||||
|
@ -43,7 +43,6 @@ class Task(object):
|
||||
Mistral engine or its components in order to manipulate with tasks.
|
||||
"""
|
||||
|
||||
@profiler.trace('task-create')
|
||||
def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None,
|
||||
unique_key=None, waiting=False):
|
||||
self.wf_ex = wf_ex
|
||||
@ -253,7 +252,7 @@ class RegularTask(Task):
|
||||
Takes care of processing regular tasks with one action.
|
||||
"""
|
||||
|
||||
@profiler.trace('regular-task-on-action-complete')
|
||||
@profiler.trace('regular-task-on-action-complete', hide_args=True)
|
||||
def on_action_complete(self, action_ex):
|
||||
state = action_ex.state
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
@ -352,7 +351,7 @@ class RegularTask(Task):
|
||||
safe_rerun=self.task_spec.get_safe_rerun()
|
||||
)
|
||||
|
||||
@profiler.trace('regular-task-get-target')
|
||||
@profiler.trace('regular-task-get-target', hide_args=True)
|
||||
def _get_target(self, input_dict):
|
||||
ctx_view = data_flow.ContextView(
|
||||
input_dict,
|
||||
@ -366,7 +365,7 @@ class RegularTask(Task):
|
||||
ctx_view
|
||||
)
|
||||
|
||||
@profiler.trace('regular-task-get-action-input')
|
||||
@profiler.trace('regular-task-get-action-input', hide_args=True)
|
||||
def _get_action_input(self, ctx=None):
|
||||
ctx = ctx or self.ctx
|
||||
|
||||
@ -426,7 +425,7 @@ class WithItemsTask(RegularTask):
|
||||
_CAPACITY: 0
|
||||
}
|
||||
|
||||
@profiler.trace('with-items-task-on-action-complete')
|
||||
@profiler.trace('with-items-task-on-action-complete', hide_args=True)
|
||||
def on_action_complete(self, action_ex):
|
||||
assert self.task_ex
|
||||
|
||||
|
@ -14,7 +14,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral import utils
|
||||
from mistral.utils import serializers
|
||||
from osprofiler import profiler
|
||||
|
||||
|
||||
class Result(object):
|
||||
@ -26,8 +28,20 @@ class Result(object):
|
||||
self.cancel = cancel
|
||||
|
||||
def __repr__(self):
|
||||
try:
|
||||
profiler.start("action-result-repr")
|
||||
import traceback
|
||||
traceback.print_stack()
|
||||
|
||||
return 'Result [data=%s, error=%s, cancel=%s]' % (
|
||||
repr(self.data), repr(self.error), str(self.cancel)
|
||||
)
|
||||
finally:
|
||||
profiler.stop()
|
||||
|
||||
def cut_repr(self):
|
||||
return 'Result [data=%s, error=%s, cancel=%s]' % (
|
||||
repr(self.data), repr(self.error), str(self.cancel)
|
||||
utils.cut(self.data), utils.cut(self.error), str(self.cancel)
|
||||
)
|
||||
|
||||
def is_cancel(self):
|
||||
|
Loading…
Reference in New Issue
Block a user