Expose stop_workflow in API

Client sometimes needs to stop the workflow. Most often, when there is
an obvious error and it makes no sense to wait for completion. Or
when execution got to zombie state by any reasons.

Provide an ability to force-stop the workflow, pass SUCCESS or
ERROR, and status message for client message or error info.

- [x] Add public method and tests to default_engine.
- [x] Expose stop_workflow in rpc.
- [x] Expose in API.
- [x] Add a functional test.

Change-Id: I1bee7d9d780c1b1d09bc146843c751802e68f8fc
This commit is contained in:
Dmitri Zimine
2015-02-05 23:39:52 -08:00
parent 67454a11df
commit 6a1352ac6a
11 changed files with 236 additions and 37 deletions

View File

@@ -147,17 +147,20 @@ class ExecutionsController(rest.RestController):
)
new_state = execution.state
msg = execution.state_info
if new_state == states.PAUSED:
exec_db = rpc.get_engine_client().pause_workflow(id)
elif new_state == states.RUNNING:
exec_db = rpc.get_engine_client().resume_workflow(id)
elif new_state in [states.SUCCESS, states.ERROR]:
exec_db = rpc.get_engine_client().stop_workflow(id, new_state, msg)
else:
# To prevent changing state in other cases throw a message.
raise exc.DataAccessException(
"Error. Can not change state to %s. "
"Only valid states '%s' or '%s' allowed."
% (new_state, states.RUNNING, states.PAUSED)
"Can not change state to %s. Allowed states are: '%s" %
(new_state, ", ".join([states.RUNNING, states.PAUSED,
states.SUCCESS, states.ERROR]))
)
return Execution.from_dict(exec_db if isinstance(exec_db, dict)

View File

@@ -73,6 +73,19 @@ class Engine(object):
"""
raise NotImplementedError
@abc.abstractmethod
def stop_workflow(self, execution_id, state, message):
"""Stops workflow execution.
:param execution_id: Workflow execution id.
:param state: State assigned to the workflow. Permitted states are
SUCCESS or ERROR.
:param message: Optional information string.
:return: Workflow execution.
"""
raise NotImplementedError
@abc.abstractmethod
def rollback_workflow(self, execution_id):
"""Rolls back workflow execution.

View File

@@ -303,7 +303,7 @@ class RunTask(EngineCommand):
class FailWorkflow(EngineCommand):
def run_local(self, exec_db, wf_handler, cause_task_db=None):
wf_handler.fail_workflow()
wf_handler.stop_workflow(states.ERROR)
return False
def run_remote(self, exec_db, wf_handler, cause_task_db=None):
@@ -312,7 +312,7 @@ class FailWorkflow(EngineCommand):
class SucceedWorkflow(EngineCommand):
def run_local(self, exec_db, wf_handler, cause_task_db=None):
wf_handler.succeed_workflow()
wf_handler.stop_workflow(states.SUCCESS)
return False
def run_remote(self, exec_db, wf_handler, cause_task_db=None):

View File

@@ -215,8 +215,21 @@ class DefaultEngine(base.Engine):
return exec_db
# TODO(dzimine): make public, add to RPC, expose in API
@u.log_exec(LOG)
def stop_workflow(self, execution_id, state, message=None):
with db_api.transaction():
exec_db = db_api.get_execution(execution_id)
wf_handler = wfh_factory.create_workflow_handler(exec_db)
return wf_handler.stop_workflow(state, message)
@u.log_exec(LOG)
def rollback_workflow(self, execution_id):
# TODO(rakhmerov): Implement.
raise NotImplementedError
def _fail_workflow(self, execution_id, err, task_id=None):
"""Private helper to fail workflow on exceptions."""
with db_api.transaction():
err_msg = str(err)
@@ -228,8 +241,7 @@ class DefaultEngine(base.Engine):
return
wf_handler = wfh_factory.create_workflow_handler(exec_db)
wf_handler.fail_workflow(err_msg)
wf_handler.stop_workflow(states.ERROR, err_msg)
if task_id:
# Note(dzimine): Don't call self.engine_client:
@@ -240,12 +252,6 @@ class DefaultEngine(base.Engine):
wf_utils.TaskResult(error=err_msg)
)
return exec_db
def rollback_workflow(self, execution_id):
# TODO(rakhmerov): Implement.
raise NotImplementedError
@staticmethod
def _canonize_workflow_params(params):
# Resolve environment parameter.

View File

@@ -1,4 +1,5 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -139,6 +140,29 @@ class EngineServer(object):
return self._engine.resume_workflow(execution_id)
def stop_workflow(self, rpc_ctx, execution_id, state, message=None):
"""Receives calls over RPC to stop workflows on engine.
Sets execution state to SUCCESS or ERROR. No more tasks will be
scheduled. Running tasks won't be killed, but their results
will be ignored.
:param rpc_ctx: RPC request context.
:param execution_id: Workflow execution id.
:param state: State assigned to the workflow. Permitted states are
SUCCESS or ERROR.
:param message: Optional information string.
:return: Workflow execution.
"""
LOG.info(
"Received RPC request 'stop_workflow'[rpc_ctx=%s, execution_id=%s,"
" state=%s, message=%s]" % (rpc_ctx, execution_id, state, message)
)
return self._engine.stop_workflow(execution_id, state, message)
def rollback_workflow(self, rpc_ctx, execution_id):
"""Receives calls over RPC to rollback workflows on engine.
@@ -240,6 +264,27 @@ class EngineClient(base.Engine):
execution_id=execution_id
)
def stop_workflow(self, execution_id, state, message=None):
"""Stops workflow execution with given status.
Once stopped, the workflow is complete with SUCCESS or ERROR,
and can not be resumed.
:param execution_id: Workflow execution id
:param state: State assigned to the workflow: SUCCESS or ERROR
:param message: Optional information string
:return: Workflow execution, model.Execution
"""
return self._client.call(
auth_ctx.ctx(),
'stop_workflow',
execution_id=execution_id,
state=state,
message=message
)
def rollback_workflow(self, execution_id):
"""Rolls back the workflow with the given execution id.

View File

@@ -307,7 +307,7 @@ class ExecutionTestsV2(base.TestCase):
self.assertEqual(execution['id'], body['id'])
@test.attr(type='sanity')
def test_update_execution(self):
def test_update_execution_pause(self):
_, execution = self.client.create_execution(self.direct_wf)
resp, body = self.client.update_execution(
execution['id'], '{"state": "PAUSED"}')
@@ -315,6 +315,16 @@ class ExecutionTestsV2(base.TestCase):
self.assertEqual(200, resp.status)
self.assertEqual('PAUSED', body['state'])
@test.attr(type='sanity')
def test_update_execution_fail(self):
_, execution = self.client.create_execution(self.direct_wf)
resp, body = self.client.update_execution(
execution['id'], '{"state": "ERROR", "state_info": "Forced"}')
self.assertEqual(200, resp.status)
self.assertEqual('ERROR', body['state'])
self.assertEqual('Forced', body['state_info'])
@test.attr(type='negative')
def test_get_nonexistent_execution(self):
self.assertRaises(exceptions.NotFound, self.client.get_object,

View File

@@ -87,6 +87,24 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_EXEC, resp.json)
@mock.patch.object(db_api, 'ensure_execution_exists', MOCK_EXECUTION)
def test_put_stop(self):
update_exec = copy.copy(EXEC)
update_exec['state'] = states.ERROR
update_exec['state_info'] = "Force"
with mock.patch.object(rpc.EngineClient, 'stop_workflow') as mock_pw:
exec_db = copy.copy(EXEC_DB)
exec_db['state'] = states.ERROR
exec_db['state_info'] = "Force"
mock_pw.return_value = exec_db
resp = self.app.put_json('/v2/executions/123', update_exec)
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(update_exec, resp.json)
mock_pw.assert_called_once_with('123', 'ERROR', "Force")
@mock.patch.object(db_api, 'update_execution', MOCK_NOT_FOUND)
def test_put_not_found(self):
resp = self.app.put_json(

View File

@@ -92,8 +92,6 @@ ENVIRONMENT_DB = models.Environment(
MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB)
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
# TODO(rakhmerov): Add more advanced tests including various capabilities.
class DefaultEngineTest(base.DbTestCase):
def setUp(self):
@@ -291,9 +289,48 @@ class DefaultEngineTest(base.DbTestCase):
self._assert_single_item(exec_db.tasks, name='task1')
self._assert_single_item(exec_db.tasks, name='task2')
def test_stop_workflow(self):
# TODO(akhmerov): Implement.
pass
def test_stop_workflow_fail(self):
# Start workflow.
exec_db = self.engine.start_workflow(
'wb.wf1', {'param1': 'Hey', 'param2': 'Hi'}, task_name="task2")
# Re-read execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
self.engine.stop_workflow(exec_db.id, 'ERROR', "Stop this!")
# Re-read from DB again
exec_db = db_api.get_execution(exec_db.id)
self.assertEqual('ERROR', exec_db.state)
self.assertEqual("Stop this!", exec_db.state_info)
def test_stop_workflow_succeed(self):
# Start workflow.
exec_db = self.engine.start_workflow(
'wb.wf1', {'param1': 'Hey', 'param2': 'Hi'}, task_name="task2")
# Re-read execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
self.engine.stop_workflow(exec_db.id, 'SUCCESS', "Like this, done")
# Re-read from DB again
exec_db = db_api.get_execution(exec_db.id)
self.assertEqual('SUCCESS', exec_db.state)
self.assertEqual("Like this, done", exec_db.state_info)
def test_stop_workflow_bad_status(self):
exec_db = self.engine.start_workflow(
'wb.wf1', {'param1': 'Hey', 'param2': 'Hi'}, task_name="task2")
# Re-read execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
self.assertRaises(
exc.WorkflowException,
self.engine.stop_workflow,
exec_db.id,
'PAUSE'
)
def test_resume_workflow(self):
# TODO(akhmerov): Implement.

View File

@@ -18,7 +18,6 @@ from oslo.config import cfg
from mistral.db.v2 import api as db_api
from mistral.engine1 import default_engine as de
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.services import scheduler
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine1 import base
@@ -26,7 +25,6 @@ from mistral.workbook import parser as spec_parser
from mistral.workflow import states
from mistral.workflow import utils
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')

View File

@@ -0,0 +1,64 @@
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db.v2 import api as db_api
from mistral.services import scheduler
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine1 import base
from mistral.workflow import states
class WorkflowStopTest(base.EngineTestCase):
def setUp(self):
super(WorkflowStopTest, self).setUp()
WORKFLOW = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Echo"
on-complete:
- task2
task2:
action: std.echo output="foo"
policies:
wait-before: 3
"""
wf_service.create_workflows(WORKFLOW)
# Note(dzimine): scheduler.setup is nessessary for wait- policies.
thread_group = scheduler.setup()
self.addCleanup(thread_group.stop)
self.exec_id = self.engine.start_workflow('wf', {}).id
def test_stop_failed(self):
self.engine.stop_workflow(self.exec_id, states.SUCCESS, "Force stop")
self._await(lambda: self.is_execution_success(self.exec_id))
exec_db = db_api.get_execution(self.exec_id)
self.assertEqual(states.SUCCESS, exec_db.state)
self.assertEqual("Force stop", exec_db.state_info)
def test_stop_succeeded(self):
self.engine.stop_workflow(self.exec_id, states.ERROR, "Failure")
self._await(lambda: self.is_execution_error(self.exec_id))
exec_db = db_api.get_execution(self.exec_id)
self.assertEqual(states.ERROR, exec_db.state)
self.assertEqual("Failure", exec_db.state_info)

View File

@@ -247,12 +247,26 @@ class WorkflowHandler(object):
def is_paused_or_completed(self):
return states.is_paused_or_completed(self.exec_db.state)
def succeed_workflow(self):
"""Completes workflow with SUCCESS status.
def stop_workflow(self, state, message=None):
"""Completes workflow as succeeded or failed.
Sets execution state to SUCCESS or ERROR. No more tasks will be
scheduled. Running tasks won't be killed, but their results
will be ignored.
:param state: 'SUCCESS' or 'ERROR'
:param message: State info text with context of the operation.
:return: Execution object.
"""
self._set_execution_state(states.SUCCESS)
if state not in [states.SUCCESS, states.ERROR]:
msg = ("Illegal state %s: provided while stopping workflow "
"execution id=%s. State can be %s or %s. "
"Stop request IGNORED." %
(state, self.exec_db.id, states.SUCCESS, states.ERROR))
raise exc.WorkflowException(msg)
self._set_execution_state(state, message)
return self.exec_db
@@ -265,15 +279,6 @@ class WorkflowHandler(object):
return self.exec_db
def fail_workflow(self, err_msg=None):
"""Stops workflow with ERROR state.
:return: Execution object.
"""
self._set_execution_state(states.ERROR, err_msg)
return self.exec_db
def resume_workflow(self):
"""Resumes workflow this handler is associated with.
@@ -309,9 +314,9 @@ class WorkflowHandler(object):
self.exec_db.state = state
self.exec_db.state_info = state_info
else:
msg = ("Can't change workflow state "
"[execution=%s, state=%s -> %s]" %
(self.exec_db, cur_state, state))
msg = ("Can't change workflow execution state from %s to %s. "
"[workflow=%s, execution_id=%s]" %
(cur_state, state, self.exec_db.wf_name, self.exec_db.id))
raise exc.WorkflowException(msg)