Headers propagation from an execution request to actions/notifications

Currently Mistal doesn`t support distribution tracing, which means
that there`s no way to propagate traceId/spanId from an execution to
related actions/notifications.
It leads us to an issue, because according to distribution tracing
principles "traceId" should remain the same during the discovery
process, which includes Mistral worflow execution.
Proposed solution is to keep some headers from execution request
and propagate them later to actions/notifications.
Regexp can be stored as env variable to define headers which
will be propagated.

Implements blueprint add-headers-propagation
Signed-off-by: Vadim Zelenevsky <wortellen@gmail.com>
Change-Id: I91ef9126d263fba89a2b0898d554de5ab44382fc
This commit is contained in:
Vadim Zelenevsky 2023-10-30 11:07:01 +06:00 committed by Zelenevskii Vadim
parent 9a011cb277
commit 6c9276d3e2
14 changed files with 403 additions and 1 deletions

View File

@ -50,6 +50,16 @@ directory.
cert_file = <path-to-certificate file>
key_file = <path-to-key file>
#. If you want to use headers propogation from execution's headers into actions
and notifications, provide following options in config file::
[headers_propagation]
enabled = True
template = Regex1, Regex2, Regex3
Be sure not to use `.*`, otherwise you can ruin your http actions
(for example, by propagation 'Content-Length' header).
#. **If you don't use OpenStack or you want to disable authentication for the
Mistral service**, provide ``auth_enable = False`` in the config file::

View File

@ -380,3 +380,31 @@ Task skip could be performed by following request::
"id": "<task-id>",
"state": "SKIPPED"
}
Headers Propagation
-------------------
Headers that were used in request to start execution, can be propagated
into actions and notifications.
In actions, they will be available via action context:
.. code-block:: python
class TestHeadersAction(actions.Action):
def __init__(self):
self.headers = None
def run(self, context):
self.headers = context.execution.headers
To configure this feature, you should define `headers_propagation` section
in your **config file**:
.. code-block:: cfg
[headers_propagation]
enabled = True
template = Regex1, Regex2, Regex3
Be sure not to use `.*`, otherwise you can ruin your http actions
(for example, by propagation 'Content-Length' header).

View File

@ -233,6 +233,12 @@ class HTTPAction(actions.Action):
else:
action_verify = None
if context.execution.workflow_propagated_headers:
if not self.headers:
self.headers = {}
self.headers.update(
context.execution.workflow_propagated_headers)
resp = requests.request(
self.method,
self.url,

View File

@ -17,9 +17,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
import pecan
from pecan import rest
import re
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
@ -307,6 +310,18 @@ class ExecutionsController(rest.RestController):
engine = rpc.get_engine_client()
if cfg.CONF.headers_propagation.enabled:
if 'params' not in result_exec_dict:
result_exec_dict['params'] = {}
result_exec_dict['params']['headers'] = {}
template = cfg.CONF.headers_propagation.template
for h_name, h_value in pecan.request.headers.items():
for t in template:
if re.match(t, h_name):
result_exec_dict['params']['headers'][h_name] = h_value
result = engine.start_workflow(
result_exec_dict.get(
'workflow_id',

View File

@ -207,6 +207,22 @@ pecan_opts = [
)
]
headers_propagation_opts = [
cfg.BoolOpt(
'enabled',
default=False,
help=_('Parameter for enabling header propagation.')
),
cfg.ListOpt(
'template',
default='.*',
help=_(
'Template (regex) to sort if header '
'should be propagated or not.'
)
),
]
engine_opts = [
cfg.StrOpt('engine', default='default', help='Mistral engine plugin'),
cfg.HostAddressOpt(
@ -766,6 +782,7 @@ CRON_TRIGGER_GROUP = 'cron_trigger'
EVENT_ENGINE_GROUP = 'event_engine'
NOTIFIER_GROUP = 'notifier'
PECAN_GROUP = 'pecan'
HEADERS_PROP_GROUP = 'headers_propagation'
COORDINATION_GROUP = 'coordination'
EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy'
ACTION_HEARTBEAT_GROUP = 'action_heartbeat'
@ -809,6 +826,7 @@ CONF.register_opts(context_versioning_opts, group=CONTEXT_VERSIONING_GROUP)
CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP)
CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP)
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
CONF.register_opts(headers_propagation_opts, group=HEADERS_PROP_GROUP)
CONF.register_opts(coordination_opts, group=COORDINATION_GROUP)
CONF.register_opts(profiler_opts, group=PROFILER_GROUP)
CONF.register_opts(keycloak_oidc_opts, group=KEYCLOAK_OIDC_GROUP)
@ -856,6 +874,7 @@ def list_opts():
(CRON_TRIGGER_GROUP, cron_trigger_opts),
(NOTIFIER_GROUP, notifier_opts),
(PECAN_GROUP, pecan_opts),
(HEADERS_PROP_GROUP, headers_propagation_opts),
(COORDINATION_GROUP, coordination_opts),
(EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts),
(PROFILER_GROUP, profiler_opts),

View File

@ -328,6 +328,9 @@ def create_action_context(execution_ctx):
service_catalog=context.service_catalog,
trust_id=context.trust_id,
)
if 'headers' in execution_ctx:
temp_headers = execution_ctx.pop('headers')
execution_ctx['workflow_propagated_headers'] = temp_headers
ex_ctx = lib_ctx.ExecutionContext(**execution_ctx)

View File

@ -148,6 +148,9 @@ class Action(object, metaclass=abc.ABCMeta):
res['task_execution_id'] = self.task_ex.id
res['workflow_name'] = wf_ex.name
if 'params' in wf_ex and 'headers' in wf_ex['params']:
res['headers'] = wf_ex['params']['headers']
if self.action_ex:
res['action_execution_id'] = self.action_ex.id
res['callback_url'] = (
@ -401,6 +404,9 @@ class WorkflowAction(Action):
if 'notify' in parent_wf_ex.params:
wf_params['notify'] = parent_wf_ex.params['notify']
if 'headers' in parent_wf_ex.params:
wf_params['headers'] = parent_wf_ex.params['headers']
for k, v in list(input_dict.items()):
if k not in wf_spec.get_input():
wf_params[k] = v

View File

@ -108,6 +108,9 @@ class Task(object, metaclass=abc.ABCMeta):
"finished_at": utils.datetime_to_str(self.task_ex.finished_at)
}
if 'params' in self.wf_ex and 'headers' in self.wf_ex['params']:
data['headers'] = self.wf_ex['params']['headers']
def _send_notification():
notifier.notify(
self.task_ex.id,

View File

@ -103,6 +103,9 @@ class Workflow(object, metaclass=abc.ABCMeta):
"updated_at": utils.datetime_to_str(self.wf_ex.updated_at)
}
if 'params' in self.wf_ex and 'headers' in self.wf_ex['params']:
data['headers'] = self.wf_ex['params']['headers']
def _send_notification():
notifier.notify(
self.wf_ex.id,

View File

@ -30,6 +30,10 @@ class WebhookPublisher(base.NotificationPublisher):
url = kwargs.get('url')
headers = kwargs.get('headers', {})
if 'headers' in data:
headers.update(data['headers'])
del data['headers']
resp = requests.post(url, data=json.dumps(data), headers=headers)
LOG.info("Webook request url=%s code=%s", url, resp.status_code)

View File

@ -64,6 +64,8 @@ class HTTPActionTest(base.BaseTest):
def test_http_action(self, mocked_method):
mocked_method.return_value = get_success_fake_response()
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
action = std.HTTPAction(
url=URL,
@ -104,6 +106,8 @@ class HTTPActionTest(base.BaseTest):
def test_http_action_error_result(self, mocked_method):
mocked_method.return_value = get_error_fake_response()
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
action = std.HTTPAction(
url=URL,
@ -122,6 +126,8 @@ class HTTPActionTest(base.BaseTest):
def test_http_action_with_auth(self, mocked_method):
mocked_method.return_value = get_success_fake_response()
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
action = std.HTTPAction(
url=URL,
@ -138,6 +144,8 @@ class HTTPActionTest(base.BaseTest):
def test_http_action_with_headers(self, mocked_method):
mocked_method.return_value = get_success_fake_response()
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
headers = {'int_header': 33, 'bool_header': True,
'float_header': 3.0, 'regular_header': 'teststring'}
@ -169,7 +177,11 @@ class HTTPActionTest(base.BaseTest):
mocked_method.return_value = get_fake_response(
content=content, code=200
)
result = action.run(mock.Mock())
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
result = action.run(mock_ctx)
self.assertEqual(content, result['content'])
invoke(None)
@ -186,6 +198,9 @@ class HTTPActionTest(base.BaseTest):
content='', code=200, encoding=None
)
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
result = action.run(mock_ctx)
self.assertIsNone(result['encoding'])
@ -206,6 +221,8 @@ class HTTPActionTest(base.BaseTest):
content='', code=201
)
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
with self.assertLogs(logger=ACTION_LOGGER, level='INFO') as logs:
action.run(mock_ctx)
@ -236,6 +253,8 @@ class HTTPActionTest(base.BaseTest):
content='', code=200
)
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
with self.assertLogs(logger=ACTION_LOGGER, level='INFO') as logs:
action.run(mock_ctx)
@ -262,6 +281,8 @@ class HTTPActionTest(base.BaseTest):
content=sensitive_data, code=200
)
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
with self.assertLogs(logger=ACTION_LOGGER, level='INFO') as logs:
action.run(mock_ctx)
@ -272,3 +293,24 @@ class HTTPActionTest(base.BaseTest):
msg = "Response body hidden due to action_logging configuration."
self.assertNotIn(sensitive_data, log)
self.assertIn(msg, log)
@mock.patch.object(requests, 'request')
def test_http_action_get_headers_from_context(self, mocked_method):
headers = {'Header1': "qwerty", 'Header2': "123",
'Header3': "wow"}
mocked_method.return_value = get_success_fake_response()
mock_ctx = mock.Mock()
mock_ctx.execution.workflow_propagated_headers = headers
action = std.HTTPAction(
url=URL,
method='GET',
headers=headers.copy(),
)
result = action.run(mock_ctx)
self.assertIn('headers', result)
args, kwargs = mocked_method.call_args
self.assertEqual(headers, kwargs['headers'])

View File

@ -60,6 +60,8 @@ class MistralHTTPActionTest(base.BaseTest):
def test_http_action(self, mocked_method):
mocked_method.return_value = get_success_fake_response()
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
action = std.MistralHTTPAction(
url=URL,
@ -110,6 +112,8 @@ class MistralHTTPActionTest(base.BaseTest):
def test_http_action_error_result(self, mocked_method):
mocked_method.return_value = get_error_fake_response()
mock_ctx = mock.Mock()
mock_headers = {}
mock_ctx.execution.workflow_propagated_headers = mock_headers
action = std.MistralHTTPAction(
url=URL,

View File

@ -1046,3 +1046,142 @@ class TestExecutionsController(base.APITest):
logging_values = ctx.get_logging_values()
self.assertEqual(exp_root_execution_id,
logging_values["root_execution_id"])
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
def test_headers_propagation_disabled(self, start_wf_func):
self.override_config(
'enabled',
False,
group='headers_propagation'
)
headers = {'Header1': "qwerty", 'Header2': "123",
'Wrongheader': "wow"}
wf_ex_dict = WF_EX.to_dict()
start_wf_func.return_value = wf_ex_dict
json_body = WF_EX_JSON_WITH_DESC.copy()
expected_json = WF_EX_JSON_WITH_DESC
resp = self.app.post_json('/v2/executions', json_body, headers=headers)
self.assertEqual(201, resp.status_int)
self.assertDictEqual(expected_json, resp.json)
kwargs = json.loads(expected_json['params'])
kwargs['description'] = expected_json['description']
start_wf_func.assert_called_once_with(
expected_json['workflow_id'],
'',
wf_ex_dict['id'],
json.loads(expected_json['input']),
**kwargs
)
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
def test_headers_propagation_enabled_wrong_headers_skip(self,
start_wf_func):
self.override_config(
'enabled',
True,
group='headers_propagation'
)
self.override_config(
'template',
'Header*',
group='headers_propagation'
)
headers = {'Header1': "qwerty", 'Header2': "123",
'Wrongheader': "wow"}
excepted_headers = {'Header1': 'qwerty', 'Header2': '123'}
wf_ex_dict = WF_EX.to_dict()
start_wf_func.return_value = wf_ex_dict
json_body = WF_EX_JSON_WITH_DESC.copy()
expected_json = WF_EX_JSON_WITH_DESC
resp = self.app.post_json('/v2/executions', json_body, headers=headers)
self.assertEqual(201, resp.status_int)
self.assertDictEqual(expected_json, resp.json)
kwargs = json.loads(expected_json['params'])
kwargs['description'] = expected_json['description']
start_wf_func.assert_called_once_with(
expected_json['workflow_id'],
'',
wf_ex_dict['id'],
json.loads(expected_json['input']),
headers=excepted_headers,
**kwargs
)
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
def test_headers_propagation_enabled_multiple_headers(self, start_wf_func):
self.override_config(
'enabled',
True,
group='headers_propagation'
)
self.override_config(
'template',
'Header*, New*',
group='headers_propagation'
)
headers = {'Header1': "qwerty", 'Header2': "123",
'Newheader1': "wow", 'Newheader2': "wow"}
excepted_headers = {'Header1': 'qwerty', 'Header2': '123',
'Newheader1': "wow", 'Newheader2': "wow"}
wf_ex_dict = WF_EX.to_dict()
start_wf_func.return_value = wf_ex_dict
json_body = WF_EX_JSON_WITH_DESC.copy()
expected_json = WF_EX_JSON_WITH_DESC
resp = self.app.post_json('/v2/executions', json_body, headers=headers)
self.assertEqual(201, resp.status_int)
self.assertDictEqual(expected_json, resp.json)
kwargs = json.loads(expected_json['params'])
kwargs['description'] = expected_json['description']
start_wf_func.assert_called_once_with(
expected_json['workflow_id'],
'',
wf_ex_dict['id'],
json.loads(expected_json['input']),
headers=excepted_headers,
**kwargs
)
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
def test_headers_propagation_enabled_template_default(self, start_wf_func):
self.override_config(
'enabled',
True,
group='headers_propagation'
)
headers = {'Header1': "qwerty", 'Header2': "123",
'Wrongheader': "wow"}
wf_ex_dict = WF_EX.to_dict()
start_wf_func.return_value = wf_ex_dict
json_body = WF_EX_JSON_WITH_DESC.copy()
expected_json = WF_EX_JSON_WITH_DESC
resp = self.app.post_json('/v2/executions', json_body, headers=headers)
self.assertEqual(201, resp.status_int)
self.assertDictEqual(expected_json, resp.json)
kwargs = json.loads(expected_json['params'])
kwargs['description'] = expected_json['description']
args = start_wf_func.call_args
self.assertTrue(headers.items() <= args[1]['headers'].items())

View File

@ -24,8 +24,10 @@ from mistral.notifiers import default_notifier as d_notif
from mistral.notifiers import notification_events as events
from mistral.notifiers import remote_notifier as r_notif
from mistral.services import workflows as wf_svc
from mistral.tests.unit import base as test_base
from mistral.tests.unit.notifiers import base
from mistral.workflow import states
import requests
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
@ -229,3 +231,121 @@ class RemoteNotifServerTest(base.NotifierTestCase):
self.assertTrue(r_notif.RemoteNotifier.notify.called)
self.assertListEqual(expected_order, EVENT_LOGS)
@mock.patch.object(
r_notif.RemoteNotifier,
'notify',
mock.MagicMock(return_value=None)
)
class HeadersPropagationTest(base.NotifierTestCase):
wf_def = """
version: '2.0'
wf:
tasks:
t1:
action: std.noop
"""
params = {
"headers": {
"Header1": "qwerty",
"Header2": "123",
"Header3": "wow",
}
}
expected_headers = {'Header1': "qwerty", 'Header2': "123",
'Header3': "wow"}
@classmethod
def setUpClass(cls):
super(HeadersPropagationTest, cls).setUpClass()
cfg.CONF.set_default('type', 'local', group='notifier')
@classmethod
def tearDownClass(cls):
cfg.CONF.set_default('type', 'remote', group='notifier')
super(HeadersPropagationTest, cls).tearDownClass()
def setUp(self):
super(HeadersPropagationTest, self).setUp()
self.publisher = notif.get_notification_publisher('webhook')
del EVENT_LOGS[:]
def tearDown(self):
super(HeadersPropagationTest, self).tearDown()
notif.cleanup()
@mock.patch.object(d_notif.DefaultNotifier, 'notify')
def test_headers_propagation_for_workflow_and_task(self, mock_notify):
mock_notify.side_effect = notifier_process
notif_options = [{'type': 'webhook'}]
wf_svc.create_workflows(self.wf_def)
wf_ex = self.engine.start_workflow(
'wf',
'',
wf_input={},
notify=notif_options,
**self.params
)
self.assertFalse(d_notif.DefaultNotifier.notify.call_args)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
call_args_list = mock_notify.call_args_list
wf_has_headers = False
task_has_headers = False
for args in call_args_list:
_, data, _, _, _ = args[0]
if 'name' in data and 'headers' in data:
if data['name'] == 'wf' and \
data['headers'] == self.expected_headers:
wf_has_headers = True
if data['name'] == 't1' and \
data['headers'] == self.expected_headers:
task_has_headers = True
self.assertTrue(wf_has_headers)
self.assertTrue(task_has_headers)
self.assertFalse(r_notif.RemoteNotifier.notify.call_args)
self.assertFalse(r_notif.RemoteNotifier.notify.called)
@mock.patch.object(
requests, 'post',
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
def test_headers_propagation_webhook_publish(self):
notif_options = [{'type': 'webhook', "url": "http://example.com"}]
wf_svc.create_workflows(self.wf_def)
wf_ex = self.engine.start_workflow(
'wf',
'',
wf_input={},
notify=notif_options,
**self.params
)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertTrue(
requests.post.call_args[1]['headers'] == self.expected_headers)