diff --git a/doc/source/admin/configuration/config-guide.rst b/doc/source/admin/configuration/config-guide.rst index 0d8dfc23d..5b48cae2e 100644 --- a/doc/source/admin/configuration/config-guide.rst +++ b/doc/source/admin/configuration/config-guide.rst @@ -131,6 +131,17 @@ directory. For more details see `policy.yaml file `_. +#. Modify logging Configuration if needed + + The default log format is compatible with Graylog, however, it is possible + to configure the log format. Mistral uses the library `oslo.log` for + logging. For configuration information, refer to Official oslo.log + Configuration at + https://docs.openstack.org/oslo.log/latest/configuration/index.html. + You can use the `root_execution_id` as follows:: + + logging_context_format_string = [%(asctime)s,%(msecs)03d][%(levelname)-5s][category=%(name)s][pid=%(process)d][root_execution_id=%(root_execution_id)s] %(message)s + #. Modify the action execution reporting configuration if needed. It is possible that actions stuck in *"RUNNING"* state, for example if the diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index bef75404a..7cea63340 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -124,16 +124,22 @@ class ExecutionsController(rest.RestController): """ acl.enforce('executions:update', context.ctx()) - LOG.debug('Update execution [id=%s, execution=%s]', id, wf_ex) + LOG.info('Update execution [id=%s, execution=%s]', id, wf_ex) @rest_utils.rest_retry_on_db_error def _compute_delta(wf_ex): with db_api.transaction(): # ensure that workflow execution exists - db_api.get_workflow_execution( + wf_ex_old = db_api.get_workflow_execution( id, - fields=(db_models.WorkflowExecution.id,) + fields=(db_models.WorkflowExecution.id, + db_models.WorkflowExecution.root_execution_id) ) + root_execution_id = wf_ex_old.root_execution_id + if not root_execution_id: + root_execution_id = wf_ex_old.id + + context.ctx(root_execution_id=root_execution_id) delta = {} @@ -234,7 +240,7 @@ class ExecutionsController(rest.RestController): """ acl.enforce('executions:create', context.ctx()) - LOG.debug("Create execution [execution=%s]", wf_ex) + LOG.info("Create execution [execution=%s]", wf_ex) exec_dict = wf_ex.to_dict() @@ -242,6 +248,7 @@ class ExecutionsController(rest.RestController): if not exec_id: exec_id = uuidutils.generate_uuid() + context.ctx(root_execution_id=exec_id) LOG.debug("Generated execution id [exec_id=%s]", exec_id) @@ -253,6 +260,7 @@ class ExecutionsController(rest.RestController): # If yes, the method just returns the object. If not, the ID # will be used to create a new execution. wf_ex = _get_workflow_execution(exec_id, must_exist=False) + context.ctx(root_execution_id=exec_id) if wf_ex: return resources.Execution.from_db_model(wf_ex) diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 518bde5bf..ce177913a 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -322,7 +322,7 @@ class TasksController(rest.RestController): """ acl.enforce('tasks:update', context.ctx()) - LOG.debug("Update task execution [id=%s, task=%s]", id, task) + LOG.info("Update task execution [id=%s, task=%s]", id, task) @rest_utils.rest_retry_on_db_error def _read_task_params(id, task): @@ -340,6 +340,12 @@ class TasksController(rest.RestController): task_ex.workflow_execution_id ) + root_execution_id = wf_ex.root_execution_id + if not root_execution_id: + root_execution_id = wf_ex.id + + context.ctx(root_execution_id=root_execution_id) + return env, reset, task_ex, task_spec, wf_ex env, reset, task_ex, task_spec, wf_ex = _read_task_params(id, task) diff --git a/mistral/context.py b/mistral/context.py index cd04b0522..b57b44c2b 100644 --- a/mistral/context.py +++ b/mistral/context.py @@ -40,10 +40,13 @@ ALLOWED_WITHOUT_AUTH = ['/', '/v2/', '/workflowv2/', '/workflowv2/v2/'] class MistralContext(oslo_context.RequestContext): + + FROM_DICT_EXTRA_KEYS = ['root_execution_id'] + def __init__(self, auth_uri=None, auth_cacert=None, insecure=False, service_catalog=None, region_name=None, is_trust_scoped=False, redelivered=False, expires_at=None, trust_id=None, - is_target=False, **kwargs): + is_target=False, root_execution_id=None, **kwargs): self.auth_uri = auth_uri self.auth_cacert = auth_cacert self.insecure = insecure @@ -54,11 +57,25 @@ class MistralContext(oslo_context.RequestContext): self.expires_at = expires_at self.trust_id = trust_id self.is_target = is_target + self.root_execution_id = root_execution_id # We still use Mistral thread local variable. Maybe could consider # using the variable provided by oslo_context in future. super(MistralContext, self).__init__(overwrite=False, **kwargs) + def get_logging_values(self): + ctx_dict = super(MistralContext, self).get_logging_values() + ctx_dict.update( + { + 'root_execution_id': self.root_execution_id, + } + ) + + return ctx_dict + + def update_store(self): + super(MistralContext, self).update_store() + def to_dict(self): """Return a dictionary of context attributes.""" ctx_dict = super(MistralContext, self).to_dict() @@ -79,6 +96,7 @@ class MistralContext(oslo_context.RequestContext): 'expires_at': self.expires_at, 'trust_id': self.trust_id, 'is_target': self.is_target, + 'root_execution_id': self.root_execution_id, } ) @@ -99,6 +117,7 @@ class MistralContext(oslo_context.RequestContext): kwargs.setdefault('expires_at', values.get('expires_at')) kwargs.setdefault('trust_id', values.get('trust_id')) kwargs.setdefault('is_target', values.get('is_target', False)) + kwargs.setdefault('root_execution_id', values.get('root_execution_id')) return super(MistralContext, cls).from_dict(values, **kwargs) @@ -122,14 +141,22 @@ def has_ctx(): return utils.has_thread_local(_CTX_THREAD_LOCAL_NAME) -def ctx(): +def ctx(root_execution_id=None): if not has_ctx(): raise exc.ApplicationContextNotFoundException() - return utils.get_thread_local(_CTX_THREAD_LOCAL_NAME) + context = utils.get_thread_local(_CTX_THREAD_LOCAL_NAME) + + if root_execution_id: + context.root_execution_id = root_execution_id + set_ctx(context) + + return context def set_ctx(new_ctx): + if new_ctx: + new_ctx.update_store() utils.set_thread_local(_CTX_THREAD_LOCAL_NAME, new_ctx) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 3f9d27364..a259faa3e 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -184,6 +184,9 @@ class Action(object, metaclass=abc.ABCMeta): 'project_id': security.get_project_id(), }) + LOG.info("Create action execution [action_name=%s, action_ex_id=%s]", + self.action_desc.name, action_ex_id) + self.action_ex = db_api.create_action_execution(values) if self.task_ex: @@ -198,10 +201,12 @@ class Action(object, metaclass=abc.ABCMeta): if prev_state != state: wf_trace.info( None, - "Action '%s' (%s)(task=%s) [%s -> %s, %s]" % + "Action '%s' (%s)(task_name=%s, " + "task_ex_id=%s) [%s -> %s, %s]" % (self.action_ex.name, self.action_ex.id, self.task_ex.name if self.task_ex else None, + self.task_ex.id if self.task_ex else None, prev_state, state, result.cut_repr()) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index c5aa43006..588cde96d 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -483,9 +483,11 @@ class Task(object, metaclass=abc.ABCMeta): @profiler.trace('task-create-task-execution') def _create_task_execution(self, state=None, state_info=None): + task_id = utils.generate_unicode_uuid() + task_name = self.task_spec.get_name() values = { - 'id': utils.generate_unicode_uuid(), - 'name': self.task_spec.get_name(), + 'id': task_id, + 'name': task_name, 'workflow_execution_id': self.wf_ex.id, 'workflow_name': self.wf_ex.workflow_name, 'workflow_namespace': self.wf_ex.workflow_namespace, @@ -505,6 +507,9 @@ class Task(object, metaclass=abc.ABCMeta): if self.triggered_by: values['runtime_context']['triggered_by'] = self.triggered_by + LOG.info("Create task execution [task_name=%s, task_ex_id=%s, " + "wf_ex_id=%s]", task_name, task_id, self.wf_ex.id) + self.task_ex = db_api.create_task_execution(values) self.created = True diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 823d31889..9998d4a9f 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -367,6 +367,11 @@ class Workflow(object, metaclass=abc.ABCMeta): self.wf_ex = db_api.create_workflow_execution(values) + LOG.info("Created workflow execution [workflow_name=%s, wf_ex_id=%s, " + "task_execution_id=%s, root_execution_id=%s]", wf_def.name, + self.wf_ex.id, params.get('task_execution_id'), + params.get('root_execution_id')) + self.wf_ex.input = input_dict or {} params['env'] = _get_environment(params) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index b5a2195a9..81d8a3db1 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -138,6 +138,15 @@ class EngineClient(eng.Engine): call = self._client.async_call if async_ else self._client.sync_call + LOG.info( + "Send RPC request 'start_workflow'[workflow_identifier=%s, " + "workflow_input=%s, description=%s, params=%s]", + wf_identifier, + wf_input, + description, + params + ) + return call( auth_ctx.ctx(), 'start_workflow', @@ -199,6 +208,13 @@ class EngineClient(eng.Engine): call = self._client.async_call if async_ else self._client.sync_call + LOG.info( + "Send RPC request 'on_action_complete'[action_ex_id=%s, " + "result=%s]", + action_ex_id, + result.cut_repr() if result else None + ) + return call( auth_ctx.ctx(), 'on_action_complete', @@ -233,6 +249,13 @@ class EngineClient(eng.Engine): call = self._client.async_call if async_ else self._client.sync_call + LOG.info( + "Send RPC request 'on_action_update'" + "[action_ex_id=%s, state=%s]", + action_ex_id, + state + ) + return call( auth_ctx.ctx(), 'on_action_update', @@ -249,6 +272,11 @@ class EngineClient(eng.Engine): :return: Workflow execution. """ + LOG.info( + "Send RPC request 'pause_workflow'[execution_id=%s]", + wf_ex_id + ) + return self._client.sync_call( auth_ctx.ctx(), 'pause_workflow', @@ -269,6 +297,11 @@ class EngineClient(eng.Engine): :return: Workflow execution. """ + LOG.info( + "Send RPC request 'rerun_workflow'[task_ex_id=%s]", + task_ex_id + ) + return self._client.sync_call( auth_ctx.ctx(), 'rerun_workflow', @@ -286,6 +319,11 @@ class EngineClient(eng.Engine): :return: Workflow execution. """ + LOG.info( + "Send RPC request 'resume_workflow'[wf_ex_id=%s]", + wf_ex_id + ) + return self._client.sync_call( auth_ctx.ctx(), 'resume_workflow', @@ -307,6 +345,14 @@ class EngineClient(eng.Engine): :return: Workflow execution, model.Execution """ + LOG.info( + "Send RPC request 'stop_workflow'[execution_id=%s," + " state=%s, message=%s]", + wf_ex_id, + state, + message + ) + return self._client.sync_call( auth_ctx.ctx(), 'stop_workflow', @@ -324,6 +370,11 @@ class EngineClient(eng.Engine): :return: Workflow execution. """ + LOG.info( + "Send RPC request 'rollback_workflow'[execution_id=%s]", + wf_ex_id + ) + return self._client.sync_call( auth_ctx.ctx(), 'rollback_workflow', @@ -336,6 +387,12 @@ class EngineClient(eng.Engine): :param action_ex_ids: Action execution ids. """ + + LOG.info( + "Send RPC request 'report_running_actions'[action_ex_ids=%s]", + action_ex_ids + ) + return self._client.async_call( auth_ctx.ctx(), 'report_running_actions', @@ -385,8 +442,8 @@ class ExecutorClient(exe.Executor): else self._client.sync_call ) - LOG.debug( - 'Sending an action to executor [action=%s, action_ex_id=%s]', + LOG.info( + "Send RPC request 'run_action' [action=%s, action_ex_id=%s]", action, action_ex_id ) @@ -402,6 +459,10 @@ class EventEngineClient(evt_eng.EventEngine): self._client = base.get_rpc_client_driver()(rpc_conf_dict) def create_event_trigger(self, trigger, events): + LOG.info( + "Send RPC request 'create_event_trigger'[trigger=%s, " + "events=%s", trigger, events + ) return self._client.async_call( auth_ctx.ctx(), 'create_event_trigger', @@ -411,6 +472,10 @@ class EventEngineClient(evt_eng.EventEngine): ) def delete_event_trigger(self, trigger, events): + LOG.info( + "Send RPC request 'delete_event_trigger'[trigger=%s, " + "events=%s", trigger, events + ) return self._client.async_call( auth_ctx.ctx(), 'delete_event_trigger', @@ -420,6 +485,10 @@ class EventEngineClient(evt_eng.EventEngine): ) def update_event_trigger(self, trigger): + LOG.info( + "Send RPC request 'update_event_trigger'[rpc_ctx=%s," + " trigger=%s", trigger + ) return self._client.async_call( auth_ctx.ctx(), 'update_event_trigger', diff --git a/mistral/services/action_heartbeat_checker.py b/mistral/services/action_heartbeat_checker.py index a603ff6e4..89ebdb337 100644 --- a/mistral/services/action_heartbeat_checker.py +++ b/mistral/services/action_heartbeat_checker.py @@ -18,6 +18,7 @@ import eventlet from mistral import context as auth_ctx from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api +from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler from mistral.engine import post_tx_queue from mistral_lib import actions as mistral_lib @@ -61,10 +62,30 @@ def handle_expired_actions(): ) for action_ex in action_exs: + task_ex = db_api.get_task_execution( + action_ex.task_execution_id + ) + wf_ex = db_api.get_workflow_execution( + task_ex.workflow_execution_id, + fields=(db_models.WorkflowExecution.id, + db_models.WorkflowExecution.root_execution_id) + ) + + if wf_ex.root_execution_id: + root_execution_id = wf_ex.root_execution_id + else: + root_execution_id = wf_ex.id + result = mistral_lib.Result( error="Heartbeat wasn't received." ) + auth_ctx.set_ctx( + auth_ctx.MistralContext( + root_execution_id=root_execution_id + ) + ) + action_handler.on_action_complete(action_ex, result) diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 7873b1f33..ec60afb5d 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -29,6 +29,7 @@ from webtest import app as webtest_app from mistral.api.controllers.v2 import execution from mistral.api.controllers.v2 import resources +from mistral import context from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import api as sql_db_api from mistral.db.v2.sqlalchemy import models @@ -214,7 +215,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) @mock.patch.object( rpc_clients.EngineClient, @@ -238,7 +239,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) @mock.patch.object(rpc_clients.EngineClient, 'stop_workflow') def test_put_state_error(self, mock_stop_wf): @@ -266,7 +267,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) @mock.patch.object(rpc_clients.EngineClient, 'stop_workflow') def test_put_state_cancelled(self, mock_stop_wf): @@ -299,7 +300,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) @mock.patch.object(rpc_clients.EngineClient, 'resume_workflow') def test_put_state_resume(self, mock_resume_wf): @@ -326,7 +327,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) def test_put_invalid_state(self): invalid_states = [states.IDLE, states.WAITING, states.RUNNING_DELAYED] @@ -353,7 +354,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) @mock.patch.object(rpc_clients.EngineClient, 'stop_workflow') def test_put_state_info_unset(self, mock_stop_wf): @@ -391,7 +392,8 @@ class TestExecutionsController(base.APITest): mock_ensure.assert_called_once_with( '123', - fields=(models.WorkflowExecution.id,) + fields=(models.WorkflowExecution.id, + models.WorkflowExecution.root_execution_id) ) mock_update.assert_called_once_with('123', update_params) @@ -427,7 +429,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) def test_put_empty(self): resp = self.app.put_json('/v2/executions/123', {}, expect_errors=True) @@ -441,7 +443,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) def test_put_state_and_description(self): resp = self.app.put_json( @@ -487,7 +489,7 @@ class TestExecutionsController(base.APITest): @mock.patch.object( db_api, 'get_workflow_execution', - mock.MagicMock(return_value=None) + mock.MagicMock(return_value=WF_EX) ) def test_put_env_wrong_state(self): update_exec = { @@ -1008,3 +1010,23 @@ class TestExecutionsController(base.APITest): "Object not found", resp.body.decode() ) + + @mock.patch.object(rpc_clients.EngineClient, 'start_workflow') + def test_root_execution_id_present_in_logging_values(self, start_wf_func): + # NOTE: In fact, we use "white box" testing here to understand + # if the REST controller calls other APIs as expected. This is + # the only way of testing available with the current testing + # infrastructure. + wf_ex_dict = WF_EX.to_dict() + start_wf_func.return_value = wf_ex_dict + json_body = WF_EX_JSON_WITH_DESC.copy() + exp_root_execution_id = WF_EX_JSON["id"] + + with mock.patch("mistral.context.set_ctx") as mocked_set_cxt: + self.app.post_json('/v2/executions', json_body) + calls = mocked_set_cxt.call_args_list + ctx = calls[0][0][0] + self.assertIsInstance(ctx, context.MistralContext) + logging_values = ctx.get_logging_values() + self.assertEqual(exp_root_execution_id, + logging_values["root_execution_id"]) diff --git a/mistral/utils/wf_trace.py b/mistral/utils/wf_trace.py index bf1db6fe4..85f6a0934 100644 --- a/mistral/utils/wf_trace.py +++ b/mistral/utils/wf_trace.py @@ -36,9 +36,11 @@ def info(obj, msg, *args, **kvargs): if type(obj) is models.TaskExecution: exec_id = obj.workflow_execution_id task_id = obj.id - debug_info = '(execution_id=%s task_id=%s)' % (exec_id, task_id) + debug_info = '(wf_ex_id=%s, task_ex_id=%s)' % (exec_id, task_id) + elif type(obj) is models.WorkflowExecution: - debug_info = '(execution_id=%s)' % obj.id + debug_info = '(execution_id=%s,' \ + 'task_ex_id=%s)' % (obj.id, obj.task_execution_id) if debug_info: msg = '%s %s' % (msg, debug_info)