Support UUID when updating a workflow definition

* When updating a workflow with UUID provided, only one workflow
  definition could be contained in request body.
* Workflow name can also be updated when using UUID.
* Tenant can not update workflows of other tenants.

Partially implements: blueprint use-workflow-id-in-rest-api
Change-Id: I87f9122b8ad9727a3eeb429ce19835c57c76b32d
This commit is contained in:
Lingxian Kong 2015-12-12 22:10:26 +08:00
parent fe7f3e69a7
commit b468b9f524
6 changed files with 122 additions and 27 deletions

View File

@ -132,11 +132,14 @@ class WorkflowsController(rest.RestController, hooks.HookController):
@rest_utils.wrap_pecan_controller_exception
@pecan.expose(content_type="text/plain")
def put(self):
def put(self, identifier=None):
"""Update one or more workflows.
NOTE: The text is allowed to have definitions
of multiple workflows. In this case they all will be updated.
:param identifier: Optional. If provided, it's UUID of a workflow.
Only one workflow can be updated with identifier param.
The text is allowed to have definitions of multiple workflows. In this
case they all will be updated.
"""
definition = pecan.request.text
scope = pecan.request.GET.get('scope', 'private')
@ -146,14 +149,20 @@ class WorkflowsController(rest.RestController, hooks.HookController):
"Scope must be one of the following: %s; actual: "
"%s" % (SCOPE_TYPES.values, scope)
)
LOG.info("Update workflow(s) [definition=%s]" % definition)
db_wfs = workflows.update_workflows(definition, scope=scope)
models_dicts = [db_wf.to_dict() for db_wf in db_wfs]
db_wfs = workflows.update_workflows(
definition,
scope=scope,
identifier=identifier
)
models_dicts = [db_wf.to_dict() for db_wf in db_wfs]
workflow_list = [Workflow.from_dict(wf) for wf in models_dicts]
return Workflows(workflows=workflow_list).to_string()
return (workflow_list[0].to_string() if identifier
else Workflows(workflows=workflow_list).to_string())
@rest_utils.wrap_pecan_controller_exception
@pecan.expose(content_type="text/plain")

View File

@ -132,8 +132,8 @@ def create_workflow_definition(values):
return IMPL.create_workflow_definition(values)
def update_workflow_definition(name, values):
return IMPL.update_workflow_definition(name, values)
def update_workflow_definition(identifier, values):
return IMPL.update_workflow_definition(identifier, values)
def create_or_update_workflow_definition(name, values):

View File

@ -332,12 +332,19 @@ def create_workflow_definition(values, session=None):
@b.session_aware()
def update_workflow_definition(name, values, session=None):
wf_def = _get_workflow_definition(name)
def update_workflow_definition(identifier, values, session=None):
wf_def = get_workflow_definition(identifier)
if not wf_def:
raise exc.NotFoundException(
"Workflow not found [workflow_name=%s]" % name)
if wf_def.project_id != security.get_project_id():
raise exc.NotAllowedException(
"Can not update workflow of other tenants. "
"[workflow_identifier=%s]" % identifier
)
if wf_def.is_system:
raise exc.InvalidActionException(
"Attempt to modify a system workflow: %s" % identifier
)
wf_def.update(values.copy())

View File

@ -78,8 +78,15 @@ def _append_all_workflows(definition, is_system, scope, wf_list_spec, db_wfs):
)
def update_workflows(definition, scope='private'):
def update_workflows(definition, scope='private', identifier=None):
wf_list_spec = spec_parser.get_workflow_list_spec_from_yaml(definition)
wfs = wf_list_spec.get_workflows()
if identifier and len(wfs) > 1:
raise exc.InputException(
"More than one workflows are not supported for update with UUID "
"provided."
)
db_wfs = []
@ -88,7 +95,8 @@ def update_workflows(definition, scope='private'):
db_wfs.append(_update_workflow(
wf_spec,
definition,
scope
scope,
identifier=identifier
))
return db_wfs
@ -113,14 +121,10 @@ def _create_workflow(wf_spec, definition, scope, is_system):
)
def _update_workflow(wf_spec, definition, scope):
workflow = db_api.load_workflow_definition(wf_spec.get_name())
if workflow and workflow.is_system:
raise exc.InvalidActionException(
"Attempt to modify a system workflow: %s" %
workflow.name
)
def _update_workflow(wf_spec, definition, scope, identifier=None):
values = _get_workflow_values(wf_spec, definition, scope)
return db_api.update_workflow_definition(values['name'], values)
return db_api.update_workflow_definition(
identifier if identifier else values['name'],
values
)

View File

@ -139,6 +139,20 @@ flow:
action: std.echo output=<% * %>
"""
WFS_DEFINITION = """
---
version: '2.0'
wf1:
tasks:
task1:
action: std.echo output="Hello"
wf2:
tasks:
task1:
action: std.echo output="Mistral"
"""
MOCK_WF = mock.MagicMock(return_value=WF_DB)
MOCK_WF_SYSTEM = mock.MagicMock(return_value=WF_DB_SYSTEM)
MOCK_WF_WITH_INPUT = mock.MagicMock(return_value=WF_DB_WITH_INPUT)
@ -188,10 +202,29 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertEqual(200, resp.status_int)
self.assertDictEqual({'workflows': [UPDATED_WF]}, resp.json)
@mock.patch.object(
db_api, "load_workflow_definition", MOCK_WF_SYSTEM
@mock.patch("mistral.services.workflows.update_workflows")
def test_put_with_uuid(self, update_mock):
update_mock.return_value = [UPDATED_WF_DB]
resp = self.app.put(
'/v2/workflows/123e4567-e89b-12d3-a456-426655440000',
UPDATED_WF_DEFINITION,
headers={'Content-Type': 'text/plain'}
)
self.assertEqual(200, resp.status_int)
update_mock.assert_called_once_with(
UPDATED_WF_DEFINITION,
scope='private',
identifier='123e4567-e89b-12d3-a456-426655440000'
)
self.assertDictEqual(UPDATED_WF, resp.json)
@mock.patch(
"mistral.db.v2.sqlalchemy.api.get_workflow_definition",
return_value=WF_DB_SYSTEM
)
def test_put_system(self):
def test_put_system(self, get_mock):
resp = self.app.put(
'/v2/workflows',
UPDATED_WF_DEFINITION,
@ -256,6 +289,20 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertEqual(400, resp.status_int)
self.assertIn("Invalid DSL", resp.body.decode())
def test_put_more_workflows_with_uuid(self):
resp = self.app.put(
'/v2/workflows/123e4567-e89b-12d3-a456-426655440000',
WFS_DEFINITION,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn(
"More than one workflows are not supported for update",
resp.body.decode()
)
@mock.patch.object(db_api, "create_workflow_definition")
def test_post(self, mock_mtd):
mock_mtd.return_value = WF_DB

View File

@ -278,6 +278,7 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
self.assertIsNone(created.updated_at)
# Update workflow using workflow name as identifier.
updated = db_api.update_workflow_definition(
created['name'],
{'definition': 'my new definition'}
@ -290,6 +291,33 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
# Update workflow using workflow uuid as identifier.
updated = db_api.update_workflow_definition(
created['id'],
{'name': 'updated_name', 'definition': 'my new definition'}
)
self.assertEqual('updated_name', updated.name)
self.assertEqual('my new definition', updated.definition)
fetched = db_api.get_workflow_definition(created['id'])
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
def test_update_other_project_workflow_definition(self):
created = db_api.create_workflow_definition(WF_DEFINITIONS[0])
# Switch to another project.
auth_context.set_ctx(test_base.get_context(default=False))
self.assertRaises(
exc.NotAllowedException,
db_api.update_workflow_definition,
created.name,
{'definition': 'my new definition'}
)
def test_create_or_update_workflow_definition(self):
name = WF_DEFINITIONS[0]['name']