Create and run a workflow within a namespace
Allow adding many workflows with the same name. The way this works is by adding the new workflows under another namespace. Notice that: 1. Namespaces are not part of the mistral DSL. 2. When executing a workflow, the namespace passes down to the subworkflow. 3. When searching for the sub-workflow definition - If no workflow was found in the given namespace, than the defualt namespace will be used. 4. The default namespace is an empty string. 5. The namespace property or the workflow execution will be under params Partially-Implements: blueprint create-and-run-workflows-within-a-namespace Change-Id: Id248ec5906a0899d188675d002b45f6249d36d90
This commit is contained in:
parent
d30ff30efa
commit
0e2962f679
|
@ -72,8 +72,8 @@ class MistralClientBase(rest_client.RestClient):
|
|||
self.action_executions = []
|
||||
self.event_triggers = []
|
||||
|
||||
def get_list_obj(self, name):
|
||||
resp, body = self.get(name)
|
||||
def get_list_obj(self, url_path):
|
||||
resp, body = self.get(url_path)
|
||||
|
||||
return resp, json.loads(body)
|
||||
|
||||
|
|
|
@ -24,21 +24,30 @@ CONF = config.CONF
|
|||
|
||||
class MistralClientV2(base.MistralClientBase):
|
||||
|
||||
def post_request(self, url, file_name):
|
||||
def post_request(self, url_path, file_name):
|
||||
headers = {"headers": "Content-Type:text/plain"}
|
||||
|
||||
return self.post(url, base.get_resource(file_name), headers=headers)
|
||||
return self.post(
|
||||
url_path,
|
||||
base.get_resource(file_name),
|
||||
headers=headers
|
||||
)
|
||||
|
||||
def post_json(self, url, obj, extra_headers={}):
|
||||
def get_request(self, url_path):
|
||||
headers = {"headers": "Content-Type:application/json"}
|
||||
|
||||
return self.get(url_path, headers=headers)
|
||||
|
||||
def post_json(self, url_path, obj, extra_headers={}):
|
||||
headers = {"Content-Type": "application/json"}
|
||||
headers = dict(headers, **extra_headers)
|
||||
return self.post(url, json.dumps(obj), headers=headers)
|
||||
return self.post(url_path, json.dumps(obj), headers=headers)
|
||||
|
||||
def update_request(self, url, file_name):
|
||||
def update_request(self, url_path, file_name):
|
||||
headers = {"headers": "Content-Type:text/plain"}
|
||||
|
||||
resp, body = self.put(
|
||||
url,
|
||||
url_path,
|
||||
base.get_resource(file_name),
|
||||
headers=headers
|
||||
)
|
||||
|
@ -64,26 +73,61 @@ class MistralClientV2(base.MistralClientBase):
|
|||
|
||||
return resp, json.loads(body)
|
||||
|
||||
def create_workflow(self, yaml_file, scope=None):
|
||||
def create_workflow(self, yaml_file, scope=None, namespace=None):
|
||||
url_path = 'workflows?'
|
||||
|
||||
if scope:
|
||||
resp, body = self.post_request('workflows?scope=public', yaml_file)
|
||||
else:
|
||||
resp, body = self.post_request('workflows', yaml_file)
|
||||
url_path += 'scope=public&'
|
||||
|
||||
if namespace:
|
||||
url_path += 'namespace=' + namespace
|
||||
|
||||
resp, body = self.post_request(url_path, yaml_file)
|
||||
|
||||
for wf in json.loads(body)['workflows']:
|
||||
self.workflows.append(wf['name'])
|
||||
identifier = wf['id'] if namespace else wf['name']
|
||||
self.workflows.append(identifier)
|
||||
|
||||
return resp, json.loads(body)
|
||||
|
||||
def get_workflow(self, wf_identifier, namespace=None):
|
||||
|
||||
url_path = 'workflows/' + wf_identifier
|
||||
if namespace:
|
||||
url_path += 'namespace=' + namespace
|
||||
|
||||
resp, body = self.get_request(url_path)
|
||||
|
||||
return resp, json.loads(body)
|
||||
|
||||
def update_workflow(self, file_name, namespace=None):
|
||||
url_path = "workflows?"
|
||||
|
||||
if namespace:
|
||||
url_path += 'namespace=' + namespace
|
||||
|
||||
return self.update_request(url_path, file_name=file_name)
|
||||
|
||||
def get_action_execution(self, action_execution_id):
|
||||
return self.get('action_executions/%s' % action_execution_id)
|
||||
|
||||
def create_execution(self, identifier, wf_input=None, params=None):
|
||||
def get_action_executions(self, task_id=None):
|
||||
url_path = 'action_executions'
|
||||
if task_id:
|
||||
url_path += '?task_execution_id=%s' % task_id
|
||||
|
||||
return self.get_list_obj(url_path)
|
||||
|
||||
def create_execution(self, identifier, wf_namespace=None, wf_input=None,
|
||||
params=None):
|
||||
if uuidutils.is_uuid_like(identifier):
|
||||
body = {"workflow_id": "%s" % identifier}
|
||||
else:
|
||||
body = {"workflow_name": "%s" % identifier}
|
||||
|
||||
if wf_namespace:
|
||||
body.update({'workflow_namespace': wf_namespace})
|
||||
|
||||
if wf_input:
|
||||
body.update({'input': json.dumps(wf_input)})
|
||||
if params:
|
||||
|
@ -100,6 +144,23 @@ class MistralClientV2(base.MistralClientBase):
|
|||
|
||||
return resp, json.loads(body)
|
||||
|
||||
def get_execution(self, execution_id):
|
||||
return self.get('executions/%s' % execution_id)
|
||||
|
||||
def get_executions(self, task_id):
|
||||
url_path = 'executions'
|
||||
if task_id:
|
||||
url_path += '?task_execution_id=%s' % task_id
|
||||
|
||||
return self.get_list_obj(url_path)
|
||||
|
||||
def get_tasks(self, execution_id=None):
|
||||
url_path = 'tasks'
|
||||
if execution_id:
|
||||
url_path += '?workflow_execution_id=%s' % execution_id
|
||||
|
||||
return self.get_list_obj(url_path)
|
||||
|
||||
def create_cron_trigger(self, name, wf_name, wf_input=None, pattern=None,
|
||||
first_time=None, count=None):
|
||||
post_body = {
|
||||
|
@ -133,11 +194,8 @@ class MistralClientV2(base.MistralClientBase):
|
|||
return [t for t in all_tasks if t['workflow_name'] == wf_name]
|
||||
|
||||
def create_action_execution(self, request_body, extra_headers={}):
|
||||
resp, body = self.post_json(
|
||||
'action_executions',
|
||||
request_body,
|
||||
extra_headers
|
||||
)
|
||||
resp, body = self.post_json('action_executions', request_body,
|
||||
extra_headers)
|
||||
|
||||
params = json.loads(request_body.get('params', '{}'))
|
||||
if params.get('save_result', False):
|
||||
|
|
|
@ -226,3 +226,31 @@ class ActionExecutionTestsV2(base.TestCase):
|
|||
self.assertEqual(201, resp.status)
|
||||
output = json.loads(body['output'])
|
||||
self.assertEqual(200, output['result']['status'])
|
||||
|
||||
@decorators.idempotent_id('9438e195-031c-4502-b216-6d72941ec281')
|
||||
@decorators.attr(type='sanity')
|
||||
def test_action_execution_of_workflow_within_namespace(self):
|
||||
|
||||
resp, body = self.client.create_workflow('wf_v2.yaml', namespace='abc')
|
||||
wf_name = body['workflows'][0]['name']
|
||||
wf_namespace = body['workflows'][0]['namespace']
|
||||
self.assertEqual(201, resp.status)
|
||||
resp, body = self.client.create_execution(
|
||||
wf_name,
|
||||
wf_namespace=wf_namespace
|
||||
)
|
||||
self.assertEqual(201, resp.status)
|
||||
resp, body = self.client.get_list_obj('tasks')
|
||||
self.assertEqual(200, resp.status)
|
||||
task_id = body['tasks'][0]['id']
|
||||
|
||||
resp, body = self.client.get_list_obj(
|
||||
'action_executions?include_output=true&task_execution_id=%s' %
|
||||
task_id)
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
action_execution = body['action_executions'][0]
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
action_execution = body['action_executions'][0]
|
||||
self.assertEqual(wf_namespace, action_execution['workflow_namespace'])
|
||||
|
|
|
@ -133,9 +133,7 @@ class ActionTestsV2(base.TestCase):
|
|||
resp, body = self.client.create_action('action_v2.yaml')
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
resp, body = self.client.get_list_obj(
|
||||
'actions?is_system=False'
|
||||
)
|
||||
resp, body = self.client.get_list_obj('actions?is_system=False')
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertNotEmpty(body['actions'])
|
||||
|
@ -149,9 +147,7 @@ class ActionTestsV2(base.TestCase):
|
|||
resp, body = self.client.create_action('action_v2.yaml')
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
resp, body = self.client.get_list_obj(
|
||||
'actions?is_system=neq:False'
|
||||
)
|
||||
resp, body = self.client.get_list_obj('actions?is_system=neq:False')
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertNotEmpty(body['actions'])
|
||||
|
@ -169,8 +165,7 @@ class ActionTestsV2(base.TestCase):
|
|||
_, body = self.client.get_object('actions', created_acts[0])
|
||||
time = body['created_at']
|
||||
resp, body = self.client.get_list_obj(
|
||||
'actions?created_at=in:' + time.replace(' ', '%20')
|
||||
)
|
||||
'actions?created_at=in:' + time.replace(' ', '%20'))
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
action_names = [action['name'] for action in body['actions']]
|
||||
|
|
|
@ -107,7 +107,7 @@ class EventTriggerTestsV2(base.TestCase):
|
|||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('56b90a90-9ff3-42f8-a9eb-04a77198710e')
|
||||
def test_get_nonexistent_event_trigger(self):
|
||||
fake_id = '123e4567-e89b-12d3-a456-426655440000'
|
||||
fake_id = '3771c152-d1a7-4a82-8a50-c79d122012dc'
|
||||
|
||||
self.assertRaises(exceptions.NotFound,
|
||||
self.client.get_object,
|
||||
|
|
|
@ -19,6 +19,8 @@ from tempest.lib import exceptions
|
|||
from mistral import utils
|
||||
from mistral_tempest_tests.tests import base
|
||||
|
||||
import json
|
||||
|
||||
|
||||
class ExecutionTestsV2(base.TestCase):
|
||||
|
||||
|
@ -72,8 +74,7 @@ class ExecutionTestsV2(base.TestCase):
|
|||
self.assertIn(exec_id_2, [ex['id'] for ex in body['executions']])
|
||||
|
||||
resp, body = self.client.get_list_obj(
|
||||
'executions?limit=1&sort_keys=workflow_name&sort_dirs=asc'
|
||||
)
|
||||
'executions?limit=1&sort_keys=workflow_name&sort_dirs=asc')
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertEqual(1, len(body['executions']))
|
||||
|
@ -127,8 +128,8 @@ class ExecutionTestsV2(base.TestCase):
|
|||
def test_create_execution_for_reverse_wf(self):
|
||||
resp, body = self.client.create_execution(
|
||||
self.reverse_wf['name'],
|
||||
{self.reverse_wf['input']: "Bye"},
|
||||
{"task_name": "goodbye"})
|
||||
wf_input={self.reverse_wf['input']: "Bye"},
|
||||
params={"task_name": "goodbye"})
|
||||
|
||||
exec_id = body['id']
|
||||
self.assertEqual(201, resp.status)
|
||||
|
@ -327,3 +328,91 @@ class ExecutionTestsV2(base.TestCase):
|
|||
'executions',
|
||||
exec_id
|
||||
)
|
||||
|
||||
@decorators.idempotent_id('a882876b-7565-4f7f-9714-d99032ffaabb')
|
||||
@decorators.attr(type='sanity')
|
||||
def test_workflow_execution_of_nested_workflows_within_namespace(self):
|
||||
low_wf = 'for_wf_namespace/lowest_level_wf.yaml'
|
||||
middle_wf = 'for_wf_namespace/middle_wf.yaml'
|
||||
top_wf = 'for_wf_namespace/top_level_wf.yaml'
|
||||
|
||||
resp, wf = self.client.create_workflow(low_wf)
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
namespace = 'abc'
|
||||
resp, wf = self.client.create_workflow(low_wf, namespace=namespace)
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
resp, wf = self.client.create_workflow(middle_wf)
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
resp, wf = self.client.create_workflow(top_wf)
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
resp, wf = self.client.create_workflow(top_wf, namespace=namespace)
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
wf_name = wf['workflows'][0]['name']
|
||||
resp, top_execution = self.client.create_execution(wf_name, namespace)
|
||||
|
||||
self.assertEqual(201, resp.status)
|
||||
self.assertEqual('RUNNING', top_execution['state'])
|
||||
self.assertEqual(wf_name, top_execution['workflow_name'])
|
||||
self.assertEqual(wf_name, top_execution['workflow_name'])
|
||||
self.assertEqual(namespace, top_execution['workflow_namespace'])
|
||||
|
||||
self.client.wait_execution(top_execution, target_state='SUCCESS')
|
||||
|
||||
self.assertEqual(
|
||||
namespace,
|
||||
json.loads(top_execution['params'])['namespace']
|
||||
)
|
||||
|
||||
resp, tasks = self.client.get_tasks(top_execution['id'])
|
||||
top_task = tasks['tasks'][0]
|
||||
|
||||
self.assertEqual(wf_name, top_task['workflow_name'])
|
||||
self.assertEqual(namespace, top_task['workflow_namespace'])
|
||||
|
||||
resp, executions = self.client.get_executions(top_task['id'])
|
||||
middle_execution = executions['executions'][0]
|
||||
|
||||
self.assertEqual('middle_wf', middle_execution['workflow_name'])
|
||||
self.assertEqual('', middle_execution['workflow_namespace'])
|
||||
|
||||
self.assertEqual(
|
||||
namespace,
|
||||
json.loads(middle_execution['params'])['namespace']
|
||||
)
|
||||
|
||||
resp, tasks = self.client.get_tasks(middle_execution['id'])
|
||||
middle_task = tasks['tasks'][0]
|
||||
|
||||
self.assertEqual('middle_wf', middle_task['workflow_name'])
|
||||
self.assertEqual('', middle_task['workflow_namespace'])
|
||||
|
||||
resp, executions = self.client.get_executions(middle_task['id'])
|
||||
lowest_execution = executions['executions'][0]
|
||||
|
||||
self.assertEqual('lowest_level_wf', lowest_execution['workflow_name'])
|
||||
self.assertEqual(namespace, lowest_execution['workflow_namespace'])
|
||||
|
||||
self.assertEqual(
|
||||
namespace,
|
||||
json.loads(lowest_execution['params'])['namespace']
|
||||
)
|
||||
|
||||
resp, tasks = self.client.get_tasks(lowest_execution['id'])
|
||||
lowest_task = tasks['tasks'][0]
|
||||
|
||||
self.assertEqual('lowest_level_wf', lowest_task['workflow_name'])
|
||||
self.assertEqual(namespace, lowest_task['workflow_namespace'])
|
||||
|
||||
resp, action_executions = self.client.get_action_executions(
|
||||
lowest_task['id']
|
||||
)
|
||||
|
||||
action_execution = action_executions['action_executions'][0]
|
||||
|
||||
self.assertEqual('lowest_level_wf', action_execution['workflow_name'])
|
||||
self.assertEqual(namespace, action_execution['workflow_namespace'])
|
||||
|
|
|
@ -63,8 +63,7 @@ class TasksTestsV2(base.TestCase):
|
|||
@decorators.idempotent_id('3230d694-40fd-4094-ad12-024f40a21b94')
|
||||
def test_get_tasks_of_execution(self):
|
||||
resp, body = self.client.get_list_obj(
|
||||
'tasks?workflow_execution_id=%s' % self.execution_id
|
||||
)
|
||||
'tasks?workflow_execution_id=%s' % self.execution_id)
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertEqual(
|
||||
|
|
|
@ -242,6 +242,58 @@ class WorkflowTestsV2(base.TestCase):
|
|||
self.assertEqual(200, resp.status)
|
||||
self.assertEqual(name, body['workflows'][0]['name'])
|
||||
|
||||
@decorators.attr(type='sanity')
|
||||
@decorators.idempotent_id('42f5d135-a2b8-4a31-8135-c5ce8c5f1ed5')
|
||||
def test_workflow_within_namespace(self):
|
||||
self.useFixture(lockutils.LockFixture('mistral-workflow'))
|
||||
|
||||
namespace = 'abc'
|
||||
resp, body = self.client.create_workflow(
|
||||
'single_wf.yaml',
|
||||
namespace=namespace
|
||||
)
|
||||
name = body['workflows'][0]['name']
|
||||
id = body['workflows'][0]['id']
|
||||
|
||||
self.assertEqual(201, resp.status)
|
||||
self.assertEqual(name, body['workflows'][0]['name'])
|
||||
|
||||
resp, body = self.client.get_workflow(
|
||||
id
|
||||
)
|
||||
|
||||
self.assertEqual(namespace, body['namespace'])
|
||||
|
||||
resp, body = self.client.update_workflow('single_wf.yaml', namespace)
|
||||
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertEqual(name, body['workflows'][0]['name'])
|
||||
self.assertEqual(namespace, body['workflows'][0]['namespace'])
|
||||
|
||||
namespace = 'abc2'
|
||||
resp, body = self.client.create_workflow(
|
||||
'single_wf.yaml',
|
||||
namespace=namespace
|
||||
)
|
||||
name = body['workflows'][0]['name']
|
||||
id = body['workflows'][0]['id']
|
||||
|
||||
self.assertEqual(201, resp.status)
|
||||
self.assertEqual(name, body['workflows'][0]['name'])
|
||||
|
||||
resp, body = self.client.get_workflow(id)
|
||||
|
||||
self.assertEqual(namespace, body['namespace'])
|
||||
|
||||
self.assertRaises(exceptions.NotFound, self.client.get_workflow, name)
|
||||
|
||||
self.client.create_workflow(
|
||||
'single_wf.yaml'
|
||||
)
|
||||
|
||||
resp, body = self.client.get_workflow(id)
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
@decorators.attr(type='sanity')
|
||||
@decorators.idempotent_id('02bc1fc3-c31a-4e37-bb3d-eda46818505c')
|
||||
def test_get_workflow_definition(self):
|
||||
|
@ -280,6 +332,15 @@ class WorkflowTestsV2(base.TestCase):
|
|||
self.assertRaises(exceptions.NotFound, self.client.get_object,
|
||||
'workflows', 'nonexist')
|
||||
|
||||
exception = self.assertRaises(
|
||||
exceptions.NotFound,
|
||||
self.client.get_workflow,
|
||||
'nonexist_wf',
|
||||
'nonexist_namespace'
|
||||
)
|
||||
self.assertIn('nonexist_wf', str(exception))
|
||||
self.assertIn('nonexist_namespace', str(exception))
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('6b917213-7f11-423a-8fe0-55795dcf0fb2')
|
||||
def test_double_create_workflows(self):
|
||||
|
|
Loading…
Reference in New Issue