Support workflow id in execution operations

Workflow UUID will be used in Mistral instead of workflow name, which is
not unique in the system. So, we need to support using workflow UUID in
execution operations.

Partially implements: blueprint use-workflow-id-in-rest-api
Change-Id: I1b83eb75aa89484235e2150ab4f111da4ea766b7
This commit is contained in:
Lingxian Kong 2016-02-03 00:08:45 +13:00
parent a707998d54
commit 54a8e30406
11 changed files with 56 additions and 17 deletions

View File

@ -45,6 +45,9 @@ class Execution(resource.Resource):
workflow_name = wtypes.text workflow_name = wtypes.text
"reference to workflow definition" "reference to workflow definition"
workflow_id = wtypes.text
"reference to workflow ID"
description = wtypes.text description = wtypes.text
"description of workflow execution." "description of workflow execution."
@ -74,6 +77,7 @@ class Execution(resource.Resource):
def sample(cls): def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000', return cls(id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow', workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
description='this is the first execution.', description='this is the first execution.',
state='SUCCESS', state='SUCCESS',
input={}, input={},
@ -220,8 +224,15 @@ class ExecutionsController(rest.RestController):
engine = rpc.get_engine_client() engine = rpc.get_engine_client()
exec_dict = wf_ex.to_dict() exec_dict = wf_ex.to_dict()
if not (exec_dict.get('workflow_id')
or exec_dict.get('workflow_name')):
raise exc.WorkflowException(
"Workflow ID or workflow name must be provided. Workflow ID is"
" recommended."
)
result = engine.start_workflow( result = engine.start_workflow(
exec_dict['workflow_name'], exec_dict.get('workflow_id', exec_dict['workflow_name']),
exec_dict.get('input'), exec_dict.get('input'),
exec_dict.get('description', ''), exec_dict.get('description', ''),
**exec_dict.get('params') or {} **exec_dict.get('params') or {}

View File

@ -43,6 +43,7 @@ class Task(resource.Resource):
name = wtypes.text name = wtypes.text
workflow_name = wtypes.text workflow_name = wtypes.text
workflow_id = wtypes.text
workflow_execution_id = wtypes.text workflow_execution_id = wtypes.text
state = wtypes.text state = wtypes.text
@ -69,6 +70,7 @@ class Task(resource.Resource):
return cls( return cls(
id='123e4567-e89b-12d3-a456-426655440000', id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow', workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
workflow_execution_id='123e4567-e89b-12d3-a456-426655440000', workflow_execution_id='123e4567-e89b-12d3-a456-426655440000',
name='task', name='task',
state=states.SUCCESS, state=states.SUCCESS,

View File

@ -121,8 +121,8 @@ class Execution(mb.MistralSecureModelBase):
id = mb.id_column() id = mb.id_column()
name = sa.Column(sa.String(80)) name = sa.Column(sa.String(80))
description = sa.Column(sa.String(255), nullable=True) description = sa.Column(sa.String(255), nullable=True)
workflow_name = sa.Column(sa.String(80)) workflow_name = sa.Column(sa.String(80))
workflow_id = sa.Column(sa.String(80))
spec = sa.Column(st.JsonDictType()) spec = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20)) state = sa.Column(sa.String(20))
state_info = sa.Column(sa.Text(), nullable=True) state_info = sa.Column(sa.Text(), nullable=True)

View File

@ -62,6 +62,7 @@ def create_action_execution(action_def, action_input, task_ex=None,
values.update({ values.update({
'task_execution_id': task_ex.id, 'task_execution_id': task_ex.id,
'workflow_name': task_ex.workflow_name, 'workflow_name': task_ex.workflow_name,
'workflow_id': task_ex.workflow_id,
'project_id': task_ex.project_id, 'project_id': task_ex.project_id,
}) })
else: else:

View File

@ -29,10 +29,12 @@ class Engine(object):
"""Engine interface.""" """Engine interface."""
@abc.abstractmethod @abc.abstractmethod
def start_workflow(self, wf_name, wf_input, description='', **params): def start_workflow(self, wf_identifier, wf_input, description='',
**params):
"""Starts the specified workflow. """Starts the specified workflow.
:param wf_name: Workflow name. :param wf_identifier: Workflow ID or name. Workflow ID is recommended,
workflow name will be deprecated since Mitaka.
:param wf_input: Workflow input data as a dictionary. :param wf_input: Workflow input data as a dictionary.
:param description: Execution description. :param description: Execution description.
:param params: Additional workflow type specific parameters. :param params: Additional workflow type specific parameters.

View File

@ -50,7 +50,8 @@ class DefaultEngine(base.Engine, coordination.Service):
coordination.Service.__init__(self, 'engine_group') coordination.Service.__init__(self, 'engine_group')
@u.log_exec(LOG) @u.log_exec(LOG)
def start_workflow(self, wf_name, wf_input, description='', **params): def start_workflow(self, wf_identifier, wf_input, description='',
**params):
wf_ex_id = None wf_ex_id = None
try: try:
@ -58,7 +59,7 @@ class DefaultEngine(base.Engine, coordination.Service):
# The new workflow execution will be in an IDLE # The new workflow execution will be in an IDLE
# state on initial record creation. # state on initial record creation.
wf_ex_id = wf_ex_service.create_workflow_execution( wf_ex_id = wf_ex_service.create_workflow_execution(
wf_name, wf_identifier,
wf_input, wf_input,
description, description,
params params
@ -87,7 +88,7 @@ class DefaultEngine(base.Engine, coordination.Service):
except Exception as e: except Exception as e:
LOG.error( LOG.error(
"Failed to start workflow '%s' id=%s: %s\n%s", "Failed to start workflow '%s' id=%s: %s\n%s",
wf_name, wf_ex_id, e, traceback.format_exc() wf_identifier, wf_ex_id, e, traceback.format_exc()
) )
wf_ex = self._fail_workflow(wf_ex_id, e) wf_ex = self._fail_workflow(wf_ex_id, e)

View File

@ -77,7 +77,7 @@ class EngineServer(object):
def __init__(self, engine): def __init__(self, engine):
self._engine = engine self._engine = engine
def start_workflow(self, rpc_ctx, workflow_name, workflow_input, def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input,
description, params): description, params):
"""Receives calls over RPC to start workflows on engine. """Receives calls over RPC to start workflows on engine.
@ -87,12 +87,14 @@ class EngineServer(object):
LOG.info( LOG.info(
"Received RPC request 'start_workflow'[rpc_ctx=%s," "Received RPC request 'start_workflow'[rpc_ctx=%s,"
" workflow_name=%s, workflow_input=%s, description=%s, params=%s]" " workflow_identifier=%s, workflow_input=%s, description=%s, "
% (rpc_ctx, workflow_name, workflow_input, description, params) "params=%s]"
% (rpc_ctx, workflow_identifier, workflow_input, description,
params)
) )
return self._engine.start_workflow( return self._engine.start_workflow(
workflow_name, workflow_identifier,
workflow_input, workflow_input,
description, description,
**params **params
@ -283,7 +285,8 @@ class EngineClient(base.Engine):
) )
@wrap_messaging_exception @wrap_messaging_exception
def start_workflow(self, wf_name, wf_input, description='', **params): def start_workflow(self, wf_identifier, wf_input, description='',
**params):
"""Starts workflow sending a request to engine over RPC. """Starts workflow sending a request to engine over RPC.
:return: Workflow execution. :return: Workflow execution.
@ -291,7 +294,7 @@ class EngineClient(base.Engine):
return self._client.call( return self._client.call(
auth_ctx.ctx(), auth_ctx.ctx(),
'start_workflow', 'start_workflow',
workflow_name=wf_name, workflow_identifier=wf_identifier,
workflow_input=wf_input or {}, workflow_input=wf_input or {},
description=description, description=description,
params=params params=params

View File

@ -208,6 +208,7 @@ def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING):
'name': task_spec.get_name(), 'name': task_spec.get_name(),
'workflow_execution_id': wf_ex.id, 'workflow_execution_id': wf_ex.id,
'workflow_name': wf_ex.workflow_name, 'workflow_name': wf_ex.workflow_name,
'workflow_id': wf_ex.workflow_id,
'state': state, 'state': state,
'spec': task_spec.to_dict(), 'spec': task_spec.to_dict(),
'in_context': ctx, 'in_context': ctx,

View File

@ -50,6 +50,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
'name': wf_def.name, 'name': wf_def.name,
'description': desc, 'description': desc,
'workflow_name': wf_def.name, 'workflow_name': wf_def.name,
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(), 'spec': wf_spec.to_dict(),
'params': params or {}, 'params': params or {},
'state': states.IDLE, 'state': states.IDLE,
@ -70,10 +71,10 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
return wf_ex return wf_ex
def create_workflow_execution(wf_name, wf_input, description, params): def create_workflow_execution(wf_identifier, wf_input, description, params):
params = canonize_workflow_params(params) params = canonize_workflow_params(params)
wf_def = db_api.get_workflow_definition(wf_name) wf_def = db_api.get_workflow_definition(wf_identifier)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec) wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
eng_utils.validate_input(wf_def, wf_input, wf_spec) eng_utils.validate_input(wf_def, wf_input, wf_spec)
@ -86,6 +87,6 @@ def create_workflow_execution(wf_name, wf_input, description, params):
params params
) )
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier)
return wf_ex.id return wf_ex.id

View File

@ -33,6 +33,7 @@ from mistral.workflow import states
WF_EX = models.WorkflowExecution( WF_EX = models.WorkflowExecution(
id='123e4567-e89b-12d3-a456-426655440000', id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='some', workflow_name='some',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
description='execution description.', description='execution description.',
spec={'name': 'some'}, spec={'name': 'some'},
state=states.RUNNING, state=states.RUNNING,
@ -54,11 +55,13 @@ WF_EX_JSON = {
'created_at': '1970-01-01 00:00:00', 'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00',
'workflow_name': 'some', 'workflow_name': 'some',
'workflow_id': '123e4567-e89b-12d3-a456-426655441111'
} }
SUB_WF_EX = models.WorkflowExecution( SUB_WF_EX = models.WorkflowExecution(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
workflow_name='some', workflow_name='some',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
description='foobar', description='foobar',
spec={'name': 'some'}, spec={'name': 'some'},
state=states.RUNNING, state=states.RUNNING,
@ -74,6 +77,7 @@ SUB_WF_EX = models.WorkflowExecution(
SUB_WF_EX_JSON = { SUB_WF_EX_JSON = {
'id': SUB_WF_EX.id, 'id': SUB_WF_EX.id,
'workflow_name': 'some', 'workflow_name': 'some',
'workflow_id': '123e4567-e89b-12d3-a456-426655441111',
'description': 'foobar', 'description': 'foobar',
'input': '{"foo": "bar"}', 'input': '{"foo": "bar"}',
'output': '{}', 'output': '{}',
@ -389,7 +393,7 @@ class TestExecutionsController(base.FunctionalTest):
exec_dict = WF_EX_JSON_WITH_DESC exec_dict = WF_EX_JSON_WITH_DESC
f.assert_called_once_with( f.assert_called_once_with(
exec_dict['workflow_name'], exec_dict['workflow_id'],
json.loads(exec_dict['input']), json.loads(exec_dict['input']),
exec_dict['description'], exec_dict['description'],
**json.loads(exec_dict['params']) **json.loads(exec_dict['params'])
@ -406,6 +410,16 @@ class TestExecutionsController(base.FunctionalTest):
self.assertIn('Bad response: 400', context.args[0]) self.assertIn('Bad response: 400', context.args[0])
def test_post_without_workflow_id_and_name(self):
context = self.assertRaises(
webtest_app.AppError,
self.app.post_json,
'/v2/executions',
{'description': 'some description here.'}
)
self.assertIn('Bad response: 400', context.args[0])
@mock.patch.object(db_api, 'delete_workflow_execution', MOCK_DELETE) @mock.patch.object(db_api, 'delete_workflow_execution', MOCK_DELETE)
def test_delete(self): def test_delete(self):
resp = self.app.delete('/v2/executions/123') resp = self.app.delete('/v2/executions/123')

View File

@ -49,6 +49,7 @@ TASK_EX = models.TaskExecution(
id='123', id='123',
name='task', name='task',
workflow_name='flow', workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
spec={ spec={
'type': 'direct', 'type': 'direct',
'version': '2.0', 'version': '2.0',
@ -70,6 +71,7 @@ WITH_ITEMS_TASK_EX = models.TaskExecution(
id='123', id='123',
name='task', name='task',
workflow_name='flow', workflow_name='flow',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
spec={ spec={
'type': 'direct', 'type': 'direct',
'version': '2.0', 'version': '2.0',
@ -92,6 +94,7 @@ TASK = {
'id': '123', 'id': '123',
'name': 'task', 'name': 'task',
'workflow_name': 'flow', 'workflow_name': 'flow',
'workflow_id': '123e4567-e89b-12d3-a456-426655441111',
'state': 'RUNNING', 'state': 'RUNNING',
'workflow_execution_id': WF_EX.id, 'workflow_execution_id': WF_EX.id,
'created_at': '1970-01-01 00:00:00', 'created_at': '1970-01-01 00:00:00',