diff --git a/mistral/actions/std_actions.py b/mistral/actions/std_actions.py index d6a7258f..34aed227 100644 --- a/mistral/actions/std_actions.py +++ b/mistral/actions/std_actions.py @@ -226,13 +226,14 @@ class MistralHTTPAction(HTTPAction): proxies=None, verify=None): - a_ctx = action_context + actx = action_context headers = headers or {} headers.update({ - 'Mistral-Workflow-Name': a_ctx.get('workflow_name'), - 'Mistral-Execution-Id': a_ctx.get('workflow_execution_id'), - 'Mistral-Task-Id': a_ctx.get('task_id'), + 'Mistral-Workflow-Name': actx.get('workflow_name'), + 'Mistral-Workflow-Execution-Id': actx.get('workflow_execution_id'), + 'Mistral-Task-Id': actx.get('task_id'), + 'Mistral-Action-Execution-Id': actx.get('action_execution_id'), }) super(MistralHTTPAction, self).__init__( diff --git a/mistral/db/sqlalchemy/model_base.py b/mistral/db/sqlalchemy/model_base.py index a5b07ed9..8edd52e9 100644 --- a/mistral/db/sqlalchemy/model_base.py +++ b/mistral/db/sqlalchemy/model_base.py @@ -15,8 +15,6 @@ # limitations under the License. -import uuid - from oslo.db.sqlalchemy import models as oslo_models import sqlalchemy as sa from sqlalchemy import event @@ -27,15 +25,11 @@ from mistral.services import security from mistral import utils -def _generate_unicode_uuid(): - return unicode(str(uuid.uuid4())) - - def id_column(): return sa.Column( sa.String(36), primary_key=True, - default=_generate_unicode_uuid + default=utils.generate_unicode_uuid ) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 4f9043ff..58629711 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -164,7 +164,27 @@ def _create_task_execution(wf_ex, task_spec, ctx): def _create_action_execution(task_ex, action_def, action_input, index=0): + # TODO(rakhmerov): We can avoid hitting DB at all when calling something + # create_action_execution(), these operations can be just done using + # SQLAlchemy session (1-level cache) and session flush (on TX commit) would + # send necessary SQL queries to DB. Currently, session flush happens + # on every operation which may not be optimal. The problem with using just + # session level cache is in generating ids. Ids are generated only on + # session flush. And now we have a lot places where we need to have ids + # before TX completion. + + # Assign the action execution ID here to minimize database calls. + # Otherwise, the input property of the action execution DB object needs + # to be updated with the action execution ID after the action execution + # DB object is created. + action_ex_id = utils.generate_unicode_uuid() + + if a_m.has_action_context( + action_def.action_class, action_def.attributes or {}): + action_input.update(a_m.get_action_context(task_ex, action_ex_id)) + action_ex = db_api.create_action_execution({ + 'id': action_ex_id, 'name': action_def.name, 'task_execution_id': task_ex.id, 'workflow_name': task_ex.workflow_name, @@ -322,10 +342,6 @@ def _get_action_input(wf_spec, task_ex, task_spec, ctx): else: input_dict = {} - if a_m.has_action_context( - action_def.action_class, action_def.attributes or {}): - input_dict.update(a_m.get_action_context(task_ex)) - return input_dict diff --git a/mistral/services/action_manager.py b/mistral/services/action_manager.py index 3bfcd420..4cec45f2 100644 --- a/mistral/services/action_manager.py +++ b/mistral/services/action_manager.py @@ -146,14 +146,15 @@ def get_action_class(action_full_name): ) -def get_action_context(task_ex): +def get_action_context(task_ex, action_ex_id): return { _ACTION_CTX_PARAM: { 'workflow_name': task_ex.workflow_name, 'workflow_execution_id': task_ex.workflow_execution_id, 'task_id': task_ex.id, 'task_name': task_ex.name, - 'task_tags': task_ex.tags + 'task_tags': task_ex.tags, + 'action_execution_id': action_ex_id } } diff --git a/mistral/tests/unit/engine/test_action_context.py b/mistral/tests/unit/engine/test_action_context.py index 5a06e7ca..97eba69d 100644 --- a/mistral/tests/unit/engine/test_action_context.py +++ b/mistral/tests/unit/engine/test_action_context.py @@ -85,11 +85,13 @@ class ActionContextTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_ex.state) task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') + action_ex = self._assert_single_item(task_ex.executions) headers = { 'Mistral-Workflow-Name': wf_ex.workflow_name, + 'Mistral-Workflow-Execution-Id': wf_ex.id, 'Mistral-Task-Id': task_ex.id, - 'Mistral-Execution-Id': wf_ex.id + 'Mistral-Action-Execution-Id': action_ex.id } requests.request.assert_called_with( diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index be3f50f0..67f8f437 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -19,6 +19,7 @@ import logging import os from os import path import threading +import uuid import eventlet from eventlet import corolocal @@ -28,10 +29,15 @@ import random from mistral import version + # Thread local storage. _th_loc_storage = threading.local() +def generate_unicode_uuid(): + return unicode(str(uuid.uuid4())) + + def _get_greenlet_local_storage(): greenlet_id = corolocal.get_ident()