Add action execution ID to action context
With introduction of action execution, for supported action classes, the action_context passed as input args to the action class during action execution should include the action execution ID. Change-Id: I49d88c57aceef92f548f3680980040f78a4cdfb4 Closes-Bug: #1444155
This commit is contained in:
parent
f2a1a63ced
commit
6288035e82
@ -226,13 +226,14 @@ class MistralHTTPAction(HTTPAction):
|
||||
proxies=None,
|
||||
verify=None):
|
||||
|
||||
a_ctx = action_context
|
||||
actx = action_context
|
||||
|
||||
headers = headers or {}
|
||||
headers.update({
|
||||
'Mistral-Workflow-Name': a_ctx.get('workflow_name'),
|
||||
'Mistral-Execution-Id': a_ctx.get('workflow_execution_id'),
|
||||
'Mistral-Task-Id': a_ctx.get('task_id'),
|
||||
'Mistral-Workflow-Name': actx.get('workflow_name'),
|
||||
'Mistral-Workflow-Execution-Id': actx.get('workflow_execution_id'),
|
||||
'Mistral-Task-Id': actx.get('task_id'),
|
||||
'Mistral-Action-Execution-Id': actx.get('action_execution_id'),
|
||||
})
|
||||
|
||||
super(MistralHTTPAction, self).__init__(
|
||||
|
@ -15,8 +15,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo.db.sqlalchemy import models as oslo_models
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import event
|
||||
@ -27,15 +25,11 @@ from mistral.services import security
|
||||
from mistral import utils
|
||||
|
||||
|
||||
def _generate_unicode_uuid():
|
||||
return unicode(str(uuid.uuid4()))
|
||||
|
||||
|
||||
def id_column():
|
||||
return sa.Column(
|
||||
sa.String(36),
|
||||
primary_key=True,
|
||||
default=_generate_unicode_uuid
|
||||
default=utils.generate_unicode_uuid
|
||||
)
|
||||
|
||||
|
||||
|
@ -164,7 +164,27 @@ def _create_task_execution(wf_ex, task_spec, ctx):
|
||||
|
||||
|
||||
def _create_action_execution(task_ex, action_def, action_input, index=0):
|
||||
# TODO(rakhmerov): We can avoid hitting DB at all when calling something
|
||||
# create_action_execution(), these operations can be just done using
|
||||
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
|
||||
# send necessary SQL queries to DB. Currently, session flush happens
|
||||
# on every operation which may not be optimal. The problem with using just
|
||||
# session level cache is in generating ids. Ids are generated only on
|
||||
# session flush. And now we have a lot places where we need to have ids
|
||||
# before TX completion.
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
# Otherwise, the input property of the action execution DB object needs
|
||||
# to be updated with the action execution ID after the action execution
|
||||
# DB object is created.
|
||||
action_ex_id = utils.generate_unicode_uuid()
|
||||
|
||||
if a_m.has_action_context(
|
||||
action_def.action_class, action_def.attributes or {}):
|
||||
action_input.update(a_m.get_action_context(task_ex, action_ex_id))
|
||||
|
||||
action_ex = db_api.create_action_execution({
|
||||
'id': action_ex_id,
|
||||
'name': action_def.name,
|
||||
'task_execution_id': task_ex.id,
|
||||
'workflow_name': task_ex.workflow_name,
|
||||
@ -322,10 +342,6 @@ def _get_action_input(wf_spec, task_ex, task_spec, ctx):
|
||||
else:
|
||||
input_dict = {}
|
||||
|
||||
if a_m.has_action_context(
|
||||
action_def.action_class, action_def.attributes or {}):
|
||||
input_dict.update(a_m.get_action_context(task_ex))
|
||||
|
||||
return input_dict
|
||||
|
||||
|
||||
|
@ -146,14 +146,15 @@ def get_action_class(action_full_name):
|
||||
)
|
||||
|
||||
|
||||
def get_action_context(task_ex):
|
||||
def get_action_context(task_ex, action_ex_id):
|
||||
return {
|
||||
_ACTION_CTX_PARAM: {
|
||||
'workflow_name': task_ex.workflow_name,
|
||||
'workflow_execution_id': task_ex.workflow_execution_id,
|
||||
'task_id': task_ex.id,
|
||||
'task_name': task_ex.name,
|
||||
'task_tags': task_ex.tags
|
||||
'task_tags': task_ex.tags,
|
||||
'action_execution_id': action_ex_id
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,11 +85,13 @@ class ActionContextTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state)
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
|
||||
action_ex = self._assert_single_item(task_ex.executions)
|
||||
|
||||
headers = {
|
||||
'Mistral-Workflow-Name': wf_ex.workflow_name,
|
||||
'Mistral-Workflow-Execution-Id': wf_ex.id,
|
||||
'Mistral-Task-Id': task_ex.id,
|
||||
'Mistral-Execution-Id': wf_ex.id
|
||||
'Mistral-Action-Execution-Id': action_ex.id
|
||||
}
|
||||
|
||||
requests.request.assert_called_with(
|
||||
|
@ -19,6 +19,7 @@ import logging
|
||||
import os
|
||||
from os import path
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
import eventlet
|
||||
from eventlet import corolocal
|
||||
@ -28,10 +29,15 @@ import random
|
||||
|
||||
from mistral import version
|
||||
|
||||
|
||||
# Thread local storage.
|
||||
_th_loc_storage = threading.local()
|
||||
|
||||
|
||||
def generate_unicode_uuid():
|
||||
return unicode(str(uuid.uuid4()))
|
||||
|
||||
|
||||
def _get_greenlet_local_storage():
|
||||
greenlet_id = corolocal.get_ident()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user