Update REST API to support env update

Update workflow execution and task execution controllers to allow update of
environment variables.

Partially implements: blueprint mistral-rerun-update-env

Change-Id: Iaa829d3c22a51f48a81a862bce7d1ea67097a6f0
This commit is contained in:
Winson Chan 2015-12-17 03:14:39 +00:00
parent 0e7be02a48
commit e147bf96d4
5 changed files with 346 additions and 96 deletions

View File

@ -26,6 +26,7 @@ from mistral.api.controllers.v2 import types
from mistral.db.v2 import api as db_api
from mistral.engine import rpc
from mistral import exceptions as exc
from mistral.services import workflows as wf_service
from mistral.utils import rest_utils
from mistral.workflow import states
@ -124,37 +125,81 @@ class ExecutionsController(rest.RestController):
db_api.ensure_workflow_execution_exists(id)
new_state = wf_ex.state
new_description = wf_ex.description
msg = wf_ex.state_info if wf_ex.state_info else None
delta = {}
# Currently we can change only state or description.
if (not (new_state or new_description) or
(new_state and new_description)):
raise exc.DataAccessException(
"Only state or description of execution can be changed. "
"But they can not be changed at the same time."
if wf_ex.state:
delta['state'] = wf_ex.state
if wf_ex.description:
delta['description'] = wf_ex.description
if wf_ex.params and wf_ex.params.get('env'):
delta['env'] = wf_ex.params.get('env')
# Currently we can change only state, description, or env.
if len(delta.values()) <= 0:
raise exc.InputException(
'The property state, description, or env '
'is not provided for update.'
)
if new_description:
# Description cannot be updated together with state.
if delta.get('description') and delta.get('state'):
raise exc.InputException(
'The property description must be updated '
'separately from state.'
)
# If state change, environment cannot be updated if not RUNNING.
if (delta.get('env') and
delta.get('state') and delta['state'] != states.RUNNING):
raise exc.InputException(
'The property env can only be updated when workflow '
'execution is not running or on resume from pause.'
)
if delta.get('description'):
wf_ex = db_api.update_workflow_execution(
id,
{'description': new_description}
{'description': delta['description']}
)
elif new_state == states.PAUSED:
wf_ex = rpc.get_engine_client().pause_workflow(id)
elif new_state == states.RUNNING:
wf_ex = rpc.get_engine_client().resume_workflow(id)
elif new_state in [states.SUCCESS, states.ERROR]:
wf_ex = rpc.get_engine_client().stop_workflow(id, new_state, msg)
else:
# To prevent changing state in other cases throw a message.
raise exc.DataAccessException(
"Can not change state to %s. Allowed states are: '%s" %
(new_state, ", ".join([states.RUNNING, states.PAUSED,
states.SUCCESS, states.ERROR]))
)
if not delta.get('state') and delta.get('env'):
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(id)
wf_ex = wf_service.update_workflow_execution_env(
wf_ex,
delta.get('env')
)
if delta.get('state'):
if delta.get('state') == states.PAUSED:
wf_ex = rpc.get_engine_client().pause_workflow(id)
elif delta.get('state') == states.RUNNING:
wf_ex = rpc.get_engine_client().resume_workflow(
id,
env=delta.get('env')
)
elif delta.get('state') in [states.SUCCESS, states.ERROR]:
msg = wf_ex.state_info if wf_ex.state_info else None
wf_ex = rpc.get_engine_client().stop_workflow(
id,
delta.get('state'),
msg
)
else:
# To prevent changing state in other cases throw a message.
raise exc.InputException(
"Cannot change state to %s. Allowed states are: '%s" % (
wf_ex.state,
', '.join([
states.RUNNING,
states.PAUSED,
states.SUCCESS,
states.ERROR
])
)
)
return Execution.from_dict(
wf_ex if isinstance(wf_ex, dict) else wf_ex.to_dict()

View File

@ -62,6 +62,8 @@ class Task(resource.Resource):
# Add this param to make Mistral API work with WSME 0.8.0 or higher version
reset = wsme.wsattr(bool, mandatory=True)
env = types.jsontype
@classmethod
def sample(cls):
return cls(
@ -142,6 +144,7 @@ class TasksController(rest.RestController):
task_spec = spec_parser.get_task_spec(task_ex.spec)
task_name = task.name or None
reset = task.reset
env = task.env or None
if task_name and task_name != task_ex.name:
raise exc.WorkflowException('Task name does not match.')
@ -171,7 +174,8 @@ class TasksController(rest.RestController):
rpc.get_engine_client().rerun_workflow(
wf_ex.id,
task_ex.id,
reset
reset=reset,
env=env
)
task_ex = db_api.get_task_execution(id)

View File

@ -148,6 +148,7 @@ class EngineServer(object):
"""Receives calls over RPC to pause workflows on engine.
:param rpc_ctx: Request context.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
@ -158,13 +159,15 @@ class EngineServer(object):
return self._engine.pause_workflow(execution_id)
def rerun_workflow(self, rpc_ctx, wf_ex_id, task_ex_id, reset=True):
def rerun_workflow(self, rpc_ctx, wf_ex_id, task_ex_id,
reset=True, env=None):
"""Receives calls over RPC to rerun workflows on engine.
:param rpc_ctx: RPC request context.
:param wf_ex_id: Workflow execution id.
:param task_ex_id: Task execution id.
:param reset: If true, then purge action execution for the task.
:param env: Environment variables to update.
:return: Workflow execution.
"""
@ -173,13 +176,14 @@ class EngineServer(object):
"wf_ex_id=%s, task_ex_id=%s]" % (rpc_ctx, wf_ex_id, task_ex_id)
)
return self._engine.rerun_workflow(wf_ex_id, task_ex_id, reset)
return self._engine.rerun_workflow(wf_ex_id, task_ex_id, reset, env)
def resume_workflow(self, rpc_ctx, wf_ex_id):
def resume_workflow(self, rpc_ctx, wf_ex_id, env=None):
"""Receives calls over RPC to resume workflows on engine.
:param rpc_ctx: RPC request context.
:param wf_ex_id: Workflow execution id.
:param env: Environment variables to update.
:return: Workflow execution.
"""
@ -188,7 +192,7 @@ class EngineServer(object):
"wf_ex_id=%s]" % (rpc_ctx, wf_ex_id)
)
return self._engine.resume_workflow(wf_ex_id)
return self._engine.resume_workflow(wf_ex_id, env)
def stop_workflow(self, rpc_ctx, execution_id, state, message=None):
"""Receives calls over RPC to stop workflows on engine.
@ -356,7 +360,7 @@ class EngineClient(base.Engine):
)
@wrap_messaging_exception
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True):
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None):
"""Rerun the workflow.
This method reruns workflow with the given execution id
@ -365,6 +369,7 @@ class EngineClient(base.Engine):
:param wf_ex_id: Workflow execution id.
:param task_ex_id: Task execution id.
:param reset: If true, then purge action execution for the task.
:param env: Environment variables to update.
:return: Workflow execution.
"""
@ -373,21 +378,24 @@ class EngineClient(base.Engine):
'rerun_workflow',
wf_ex_id=wf_ex_id,
task_ex_id=task_ex_id,
reset=reset
reset=reset,
env=env
)
@wrap_messaging_exception
def resume_workflow(self, wf_ex_id):
def resume_workflow(self, wf_ex_id, env=None):
"""Resumes the workflow with the given execution id.
:param wf_ex_id: Workflow execution id.
:param env: Environment variables to update.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'resume_workflow',
wf_ex_id=wf_ex_id
wf_ex_id=wf_ex_id,
env=env
)
@wrap_messaging_exception

View File

@ -21,6 +21,7 @@ import mock
from webtest import app as webtest_app
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import api as sql_db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine import rpc
from mistral import exceptions as exc
@ -60,6 +61,13 @@ UPDATED_WF_EX['state'] = states.PAUSED
UPDATED_WF_EX_JSON = copy.deepcopy(WF_EX_JSON)
UPDATED_WF_EX_JSON['state'] = states.PAUSED
UPDATED_WF_EX_ENV = copy.deepcopy(UPDATED_WF_EX)
UPDATED_WF_EX_ENV['params'] = {'env': {'k1': 'def'}}
UPDATED_WF_EX_ENV_DESC = copy.deepcopy(UPDATED_WF_EX)
UPDATED_WF_EX_ENV_DESC['description'] = 'foobar'
UPDATED_WF_EX_ENV_DESC['params'] = {'env': {'k1': 'def'}}
WF_EX_JSON_WITH_DESC = copy.deepcopy(WF_EX_JSON)
WF_EX_JSON_WITH_DESC['description'] = "execution description."
@ -91,67 +99,141 @@ class TestExecutionsController(base.FunctionalTest):
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
MOCK_WF_EX
mock.MagicMock(return_value=None)
)
@mock.patch.object(rpc.EngineClient, 'pause_workflow',
MOCK_UPDATED_WF_EX)
def test_put(self):
resp = self.app.put_json('/v2/executions/123', UPDATED_WF_EX_JSON)
@mock.patch.object(
rpc.EngineClient,
'pause_workflow',
MOCK_UPDATED_WF_EX
)
def test_put_state_paused(self):
update_exec = {
'id': WF_EX['id'],
'state': states.PAUSED
}
UPDATED_WF_EX_WITH_DESC = copy.deepcopy(UPDATED_WF_EX_JSON)
UPDATED_WF_EX_WITH_DESC['description'] = 'execution description.'
resp = self.app.put_json('/v2/executions/123', update_exec)
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
expected_exec['state'] = states.PAUSED
self.assertEqual(200, resp.status_int)
self.assertDictEqual(UPDATED_WF_EX_WITH_DESC, resp.json)
self.assertDictEqual(expected_exec, resp.json)
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
MOCK_WF_EX
mock.MagicMock(return_value=None)
)
def test_put_stop(self):
update_exec = copy.deepcopy(WF_EX_JSON)
update_exec['state'] = states.ERROR
update_exec['state_info'] = "Force"
@mock.patch.object(rpc.EngineClient, 'stop_workflow')
def test_put_state_error(self, mock_stop_wf):
update_exec = {
'id': WF_EX['id'],
'state': states.ERROR,
'state_info': 'Force'
}
with mock.patch.object(rpc.EngineClient, 'stop_workflow') as mock_pw:
wf_ex = copy.deepcopy(WF_EX)
wf_ex['state'] = states.ERROR
wf_ex['state_info'] = "Force"
mock_pw.return_value = wf_ex
wf_ex = copy.deepcopy(WF_EX)
wf_ex['state'] = states.ERROR
wf_ex['state_info'] = 'Force'
mock_stop_wf.return_value = wf_ex
resp = self.app.put_json('/v2/executions/123', update_exec)
resp = self.app.put_json('/v2/executions/123', update_exec)
update_exec['description'] = "execution description."
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
expected_exec['state'] = states.ERROR
expected_exec['state_info'] = 'Force'
self.assertEqual(200, resp.status_int)
self.assertDictEqual(update_exec, resp.json)
mock_pw.assert_called_once_with('123', 'ERROR', "Force")
self.assertEqual(200, resp.status_int)
self.assertDictEqual(expected_exec, resp.json)
mock_stop_wf.assert_called_once_with('123', 'ERROR', 'Force')
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
MOCK_WF_EX
mock.MagicMock(return_value=None)
)
def test_put_state_info_unset(self):
update_exec = copy.deepcopy(WF_EX_JSON)
update_exec['state'] = states.ERROR
update_exec.pop('state_info', None)
@mock.patch.object(rpc.EngineClient, 'resume_workflow')
def test_put_state_resume(self, mock_resume_wf):
update_exec = {
'id': WF_EX['id'],
'state': states.RUNNING
}
with mock.patch.object(rpc.EngineClient, 'stop_workflow') as mock_pw:
wf_ex = copy.deepcopy(WF_EX)
wf_ex['state'] = states.ERROR
del wf_ex.state_info
mock_pw.return_value = wf_ex
wf_ex = copy.deepcopy(WF_EX)
wf_ex['state'] = states.RUNNING
wf_ex['state_info'] = None
mock_resume_wf.return_value = wf_ex
resp = self.app.put_json('/v2/executions/123', update_exec)
resp = self.app.put_json('/v2/executions/123', update_exec)
update_exec['description'] = 'execution description.'
update_exec['state_info'] = None
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
expected_exec['state'] = states.RUNNING
expected_exec['state_info'] = None
self.assertEqual(200, resp.status_int)
self.assertDictEqual(update_exec, resp.json)
mock_pw.assert_called_once_with('123', 'ERROR', None)
self.assertEqual(200, resp.status_int)
self.assertDictEqual(expected_exec, resp.json)
mock_resume_wf.assert_called_once_with('123', env=None)
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
@mock.patch.object(rpc.EngineClient, 'stop_workflow')
def test_put_state_info_unset(self, mock_stop_wf):
update_exec = {
'id': WF_EX['id'],
'state': states.ERROR,
}
wf_ex = copy.deepcopy(WF_EX)
wf_ex['state'] = states.ERROR
del wf_ex.state_info
mock_stop_wf.return_value = wf_ex
resp = self.app.put_json('/v2/executions/123', update_exec)
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
expected_exec['state'] = states.ERROR
expected_exec['state_info'] = None
self.assertEqual(200, resp.status_int)
self.assertDictEqual(expected_exec, resp.json)
mock_stop_wf.assert_called_once_with('123', 'ERROR', None)
@mock.patch('mistral.db.v2.api.ensure_workflow_execution_exists')
@mock.patch(
'mistral.db.v2.api.update_workflow_execution',
return_value=WF_EX
)
def test_put_description(self, mock_update, mock_ensure):
update_params = {'description': 'execution description.'}
resp = self.app.put_json('/v2/executions/123', update_params)
self.assertEqual(200, resp.status_int)
mock_ensure.assert_called_once_with('123')
mock_update.assert_called_once_with('123', update_params)
@mock.patch.object(
sql_db_api,
'get_workflow_execution',
mock.MagicMock(return_value=copy.deepcopy(UPDATED_WF_EX))
)
@mock.patch(
'mistral.services.workflows.update_workflow_execution_env',
return_value=copy.deepcopy(UPDATED_WF_EX_ENV)
)
def test_put_env(self, mock_update_env):
update_exec = {'params': '{"env": {"k1": "def"}}'}
resp = self.app.put_json('/v2/executions/123', update_exec)
self.assertEqual(200, resp.status_int)
self.assertEqual(update_exec['params'], resp.json['params'])
mock_update_env.assert_called_once_with(UPDATED_WF_EX, {'k1': 'def'})
@mock.patch.object(db_api, 'update_workflow_execution', MOCK_NOT_FOUND)
def test_put_not_found(self):
@ -163,25 +245,97 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(404, resp.status_int)
def test_put_both_state_and_description(self):
self.assertRaises(
webtest_app.AppError,
self.app.put_json,
'/v2/executions/123',
WF_EX_JSON_WITH_DESC
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
def test_put_empty(self):
resp = self.app.put_json('/v2/executions/123', {}, expect_errors=True)
self.assertEqual(400, resp.status_int)
self.assertIn(
'state, description, or env is not provided for update',
resp.json['faultstring']
)
@mock.patch('mistral.db.v2.api.ensure_workflow_execution_exists')
@mock.patch('mistral.db.v2.api.update_workflow_execution',
return_value=WF_EX)
def test_put_description(self, mock_update, mock_ensure):
update_params = {'description': 'execution description.'}
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
def test_put_state_and_description(self):
resp = self.app.put_json(
'/v2/executions/123',
{'description': 'foobar', 'state': states.ERROR},
expect_errors=True
)
resp = self.app.put_json('/v2/executions/123', update_params)
self.assertEqual(400, resp.status_int)
self.assertIn(
'description must be updated separately from state',
resp.json['faultstring']
)
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
@mock.patch.object(
sql_db_api,
'get_workflow_execution',
mock.MagicMock(return_value=copy.deepcopy(UPDATED_WF_EX))
)
@mock.patch(
'mistral.db.v2.api.update_workflow_execution',
return_value=WF_EX
)
@mock.patch(
'mistral.services.workflows.update_workflow_execution_env',
return_value=copy.deepcopy(UPDATED_WF_EX_ENV_DESC)
)
def test_put_env_and_description(self, mock_update_env, mock_update):
update_exec = {
'description': 'foobar',
'params': '{"env": {"k1": "def"}}'
}
resp = self.app.put_json('/v2/executions/123', update_exec)
self.assertEqual(200, resp.status_int)
mock_ensure.assert_called_once_with('123')
mock_update.assert_called_once_with('123', update_params)
self.assertEqual(update_exec['description'], resp.json['description'])
self.assertEqual(update_exec['params'], resp.json['params'])
mock_update.assert_called_once_with('123', {'description': 'foobar'})
mock_update_env.assert_called_once_with(UPDATED_WF_EX, {'k1': 'def'})
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
mock.MagicMock(return_value=None)
)
def test_put_env_wrong_state(self):
update_exec = {
'id': WF_EX['id'],
'state': states.SUCCESS,
'params': '{"env": {"k1": "def"}}'
}
resp = self.app.put_json(
'/v2/executions/123',
update_exec,
expect_errors=True
)
self.assertEqual(400, resp.status_int)
expected_fault = (
'env can only be updated when workflow execution '
'is not running or on resume from pause'
)
self.assertIn(expected_fault, resp.json['faultstring'])
@mock.patch.object(rpc.EngineClient, 'start_workflow')
def test_post(self, f):

View File

@ -59,7 +59,7 @@ TASK_EX = models.TaskExecution(
tags=['a', 'b'],
in_context={},
runtime_context={},
workflow_execution_id='123',
workflow_execution_id=WF_EX.id,
created_at=datetime.datetime(1970, 1, 1),
updated_at=datetime.datetime(1970, 1, 1),
published=PUBLISHED,
@ -81,7 +81,7 @@ WITH_ITEMS_TASK_EX = models.TaskExecution(
tags=['a', 'b'],
in_context={},
runtime_context={},
workflow_execution_id='123',
workflow_execution_id=WF_EX.id,
created_at=datetime.datetime(1970, 1, 1),
updated_at=datetime.datetime(1970, 1, 1),
published=PUBLISHED,
@ -93,7 +93,7 @@ TASK = {
'name': 'task',
'workflow_name': 'flow',
'state': 'RUNNING',
'workflow_execution_id': '123',
'workflow_execution_id': WF_EX.id,
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00',
'result': json.dumps(RESULT),
@ -127,10 +127,6 @@ MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
MOCK_ERROR_TASK = mock.MagicMock(return_value=ERROR_TASK_EX)
MOCK_ERROR_ITEMS_TASK = mock.MagicMock(return_value=ERROR_ITEMS_TASK_EX)
MOCK_RERUN_TASKS = mock.MagicMock(side_effect=[ERROR_TASK_EX, TASK_EX])
MOCK_RERUN_ITEMS_TASKS = mock.MagicMock(
side_effect=[ERROR_ITEMS_TASK_EX, WITH_ITEMS_TASK_EX]
)
@mock.patch.object(
@ -169,7 +165,11 @@ class TestTasksController(base.FunctionalTest):
self.assertEqual(0, len(resp.json['tasks']))
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
@mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_TASKS)
@mock.patch.object(
db_api,
'get_task_execution',
mock.MagicMock(side_effect=[ERROR_TASK_EX, TASK_EX])
)
@mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX)
def test_put(self):
params = copy.deepcopy(RERUN_TASK)
@ -180,8 +180,19 @@ class TestTasksController(base.FunctionalTest):
self.assertEqual(200, resp.status_int)
self.assertDictEqual(TASK, resp.json)
rpc.EngineClient.rerun_workflow.assert_called_with(
WF_EX.id,
TASK_EX.id,
reset=params['reset'],
env=None
)
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
@mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_TASKS)
@mock.patch.object(
db_api,
'get_task_execution',
mock.MagicMock(side_effect=[ERROR_TASK_EX, TASK_EX])
)
@mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX)
def test_put_missing_reset(self):
params = copy.deepcopy(RERUN_TASK)
@ -196,7 +207,11 @@ class TestTasksController(base.FunctionalTest):
self.assertIn('Mandatory field missing', resp.json['faultstring'])
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
@mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_ITEMS_TASKS)
@mock.patch.object(
db_api,
'get_task_execution',
mock.MagicMock(side_effect=[ERROR_ITEMS_TASK_EX, WITH_ITEMS_TASK_EX])
)
@mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX)
def test_put_with_items(self):
params = copy.deepcopy(RERUN_TASK)
@ -207,6 +222,30 @@ class TestTasksController(base.FunctionalTest):
self.assertEqual(200, resp.status_int)
self.assertDictEqual(TASK, resp.json)
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
@mock.patch.object(
db_api,
'get_task_execution',
mock.MagicMock(side_effect=[ERROR_TASK_EX, TASK_EX])
)
@mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX)
def test_put_env(self):
params = copy.deepcopy(RERUN_TASK)
params['reset'] = True
params['env'] = '{"k1": "def"}'
resp = self.app.put_json('/v2/tasks/123', params=params)
self.assertEqual(200, resp.status_int)
self.assertDictEqual(TASK, resp.json)
rpc.EngineClient.rerun_workflow.assert_called_with(
WF_EX.id,
TASK_EX.id,
reset=params['reset'],
env=json.loads(params['env'])
)
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
@mock.patch.object(db_api, 'get_task_execution', MOCK_TASK)
def test_put_current_task_not_in_error(self):