Engine as a standalone process

Refactored engine to run as a standalone process. The engine is
modified to use the RPC client/server design pattern from
oslo.messaging. A new launch option is included to run the
engine separately. The API server communicates with the engine
via the RPC client. A pecan hook is used to pass a common client
instance with appropriate transport configuration to the API.

Change-Id: I9ca757aedad764fd38645a5b858800d69808bb95
Implements: blueprint mistral-engine-standalone-process
This commit is contained in:
Winson Chan 2014-04-19 17:31:04 -07:00
parent ae46014ad8
commit e003849ecf
23 changed files with 628 additions and 289 deletions

View File

@ -21,12 +21,17 @@ To run Mistral API server perform the following command in a shell:
Note that an example configuration file can be found in etc/mistral.conf.example.
### Running Mistral Engines
To run Mistral Engine perform the following command in a shell:
*tox -evenv -- python mistral/cmd/launch.py --server engine --config-file path_to_config*
### Running Mistral Task Executors
To run Mistral Task Executor instance perform the following command in a shell:
*tox -evenv -- python mistral/cmd/launch.py --server executor --config-file path_to_config*
Note that at least one Executor instance should be running so that workflow tasks are processed by Mistral.
Note that at least one Engine instance and one Executor instance should be running so that workflow tasks are processed by Mistral.
### Debugging
To debug using a local engine and executor without dependencies such as RabbitMQ, create etc/mistral.conf with the following settings::

View File

@ -18,6 +18,7 @@ default_log_levels = mistral=INFO,mistral.cmd.api=INFO,mistral.api=DEBUG,wsme=DE
#rpc_backend=rabbit
# Specifies which mistral server to start by the launch script. (string value)
# Choices are all, api, engine, and executor.
#server=all
[api]
@ -29,6 +30,16 @@ port = 8989
# Mistral engine class (string value)
#engine=mistral.engine.scalable.engine
# Name of the engine node. This can be an opaque identifier.
# It is not necessarily a hostname, FQDN, or IP address. (string value)
#host=0.0.0.0
# The message topic that the engine listens on. (string value)
#topic=engine
# The version of the engine. (string value)
#version=1.0
[pecan]
auth_enable = True

View File

@ -19,10 +19,10 @@ import pecan
from oslo.config import cfg
from mistral import context as ctx
from mistral.engine import engine
from mistral.db import api as db_api
from mistral.services import periodic
from mistral.api import access_control
from mistral.api.hooks import engine
def get_pecan_config():
@ -48,13 +48,14 @@ def setup_app(config=None, transport=None):
app_conf = dict(config.app)
db_api.setup_db()
engine.load_engine(transport)
##TODO(akuznetsov) move this to trigger scheduling to separate process
periodic.setup()
periodic.setup(transport)
app = pecan.make_app(
app_conf.pop('root'),
hooks=lambda: [ctx.ContextHook()],
hooks=lambda: [ctx.ContextHook(),
engine.EngineHook(transport=transport)],
logging=getattr(config, 'logging', {}),
**app_conf
)

View File

@ -16,6 +16,7 @@
import json
import pecan
from pecan import rest
from pecan import abort
from wsme import types as wtypes
@ -26,7 +27,7 @@ from mistral.api.controllers.v1 import task
from mistral.openstack.common import log as logging
from mistral.api.controllers import resource
from mistral.db import api as db_api
from mistral.engine import engine
LOG = logging.getLogger(__name__)
@ -56,7 +57,8 @@ class Execution(resource.Resource):
for key, val in d.items():
if hasattr(e, key):
if key == 'context' and val:
# Nonetype check for dictionary must be explicit
if key == 'context' and val is not None:
val = json.dumps(val)
setattr(e, key, val)
@ -101,11 +103,13 @@ class ExecutionsController(rest.RestController):
def post(self, workbook_name, execution):
LOG.debug("Create execution [workbook_name=%s, execution=%s]" %
(workbook_name, execution))
try:
context = None
if execution.context:
context = json.loads(execution.context)
engine = pecan.request.context['engine']
values = engine.start_workflow_execution(execution.workbook_name,
execution.task,
context)

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pecan
from pecan import rest
from pecan import abort
from wsme import types as wtypes
@ -22,7 +23,7 @@ import wsmeext.pecan as wsme_pecan
from mistral.openstack.common import log as logging
from mistral.api.controllers import resource
from mistral.db import api as db_api
from mistral.engine import engine
LOG = logging.getLogger(__name__)
@ -67,6 +68,7 @@ class TasksController(rest.RestController):
(workbook_name, execution_id, id, task))
# TODO(rakhmerov): pass task result once it's implemented
engine = pecan.request.context['engine']
values = engine.convey_task_result(workbook_name,
execution_id,
id,

View File

View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pecan.hooks import PecanHook
from mistral import engine
from mistral.engine import client
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class EngineHook(PecanHook):
def __init__(self, transport=None):
self.transport = engine.get_transport(transport)
self.engine = client.EngineClient(self.transport)
def before(self, state):
state.request.context['engine'] = self.engine

View File

@ -37,12 +37,10 @@ from oslo import messaging
from oslo.config import cfg
from mistral import config
from mistral.engine import engine
from mistral import engine
from mistral.engine.scalable.executor import server
from mistral.api import app
from wsgiref import simple_server
from mistral.openstack.common import log as logging
@ -50,18 +48,25 @@ LOG = logging.getLogger(__name__)
def launch_executor(transport):
# TODO(rakhmerov): This is a temporary hack.
# We have to initialize engine in executor process because
# executor now calls engine.convey_task_result() directly.
engine.load_engine(transport)
target = messaging.Target(topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host)
endpoints = [server.Executor()]
ex_server = messaging.get_rpc_server(transport, target, endpoints)
endpoints = [server.Executor(transport)]
ex_server = messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet')
ex_server.start()
ex_server.wait()
def launch_engine(transport):
target = messaging.Target(topic=cfg.CONF.engine.topic,
server=cfg.CONF.engine.host)
endpoints = [engine.Engine(transport)]
en_server = messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet')
en_server.start()
en_server.wait()
def launch_api(transport):
host = cfg.CONF.api.host
port = cfg.CONF.api.port
@ -75,9 +80,9 @@ def launch_api(transport):
def launch_all(transport):
# Launch the servers on different threads.
t1 = eventlet.spawn(launch_executor, transport)
t2 = eventlet.spawn(launch_api, transport)
t1.wait()
t2.wait()
t2 = eventlet.spawn(launch_engine, transport)
t3 = eventlet.spawn(launch_api, transport)
t1.wait() and t2.wait() and t3.wait()
def main():
@ -90,6 +95,7 @@ def main():
launch_options = {
'all': launch_all,
'api': launch_api,
'engine': launch_engine,
'executor': launch_executor
}

View File

@ -31,7 +31,15 @@ api_opts = [
engine_opts = [
cfg.StrOpt('engine', default='mistral.engine.scalable.engine',
help='Mistral engine class')
help='Mistral engine class'),
cfg.StrOpt('host', default='0.0.0.0',
help='Name of the engine node. This can be an opaque '
'identifier. It is not necessarily a hostname, '
'FQDN, or IP address.'),
cfg.StrOpt('topic', default='engine',
help='The message topic that the engine listens on.'),
cfg.StrOpt('version', default='1.0',
help='The version of the engine.')
]
pecan_opts = [
@ -81,7 +89,7 @@ executor_opts = [
launch_opt = cfg.StrOpt(
'server',
default='all',
choices=('all', 'api', 'executor'),
choices=('all', 'api', 'engine', 'executor'),
help='Specifies which mistral server to start by the launch script.'
)

View File

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo import messaging
from oslo.config import cfg
from mistral.openstack.common import importutils
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def get_transport(transport=None):
return (transport if transport else messaging.get_transport(cfg.CONF))
class Engine(object):
def __init__(self, transport=None):
module_name = cfg.CONF.engine.engine
module = importutils.import_module(module_name)
self.transport = get_transport(transport)
self.backend = module.get_engine()
self.backend.transport = self.transport
def start_workflow_execution(self, cntx, **kwargs):
"""Starts a workflow execution based on the specified workbook name
and target task.
:param cntx: a request context dict
:type cntx: dict
:param kwargs: a dict of method arguments
:type kwargs: dict
:return: Workflow execution.
"""
workbook_name = kwargs.get('workbook_name')
task_name = kwargs.get('task_name')
context = kwargs.get('context', None)
return self.backend.start_workflow_execution(
workbook_name, task_name, context)
def stop_workflow_execution(self, cntx, **kwargs):
"""Stops the workflow execution with the given id.
:param cntx: a request context dict
:type cntx: dict
:param kwargs: a dict of method arguments
:type kwargs: dict
:return: Workflow execution.
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
return self.backend.stop_workflow_execution(
workbook_name, execution_id)
def convey_task_result(self, cntx, **kwargs):
"""Conveys task result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a task once task action has been performed. One of the
clients of this method is Mistral REST API server that receives
task result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:param cntx: a request context dict
:type cntx: dict
:param kwargs: a dict of method arguments
:type kwargs: dict
:return: Task.
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
task_id = kwargs.get('task_id')
state = kwargs.get('state')
result = kwargs.get('result')
return self.backend.convey_task_result(
workbook_name, execution_id, task_id, state, result)
def get_workflow_execution_state(self, cntx, **kwargs):
"""Gets the workflow execution state.
:param cntx: a request context dict
:type cntx: dict
:param kwargs: a dict of method arguments
:type kwargs: dict
:return: Current workflow state.
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
return self.backend.get_workflow_execution_state(
workbook_name, execution_id)
def get_task_state(self, cntx, **kwargs):
"""Gets task state.
:param cntx: a request context dict
:type cntx: dict
:param kwargs: a dict of method arguments
:type kwargs: dict
:return: Current task state.
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
task_id = kwargs.get('task_id')
return self.backend.get_task_state(
workbook_name, execution_id, task_id)

123
mistral/engine/client.py Normal file
View File

@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo import messaging
from oslo.config import cfg
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class EngineClient(object):
"""
RPC client for the Engine.
"""
def __init__(self, transport):
"""Construct an RPC client for the Engine.
:param transport: a messaging transport handle
:type transport: Transport
"""
target = messaging.Target(topic=cfg.CONF.engine.topic)
self._client = messaging.RPCClient(transport, target)
def start_workflow_execution(self, workbook_name, task_name, context=None):
"""Starts a workflow execution based on the specified workbook name
and target task.
:param workbook_name: Workbook name
:param task_name: Target task name
:param context: Execution context which defines a workflow input
:return: Workflow execution.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'task_name': task_name,
'context': context}
return self._client.call(cntx, 'start_workflow_execution', **kwargs)
def stop_workflow_execution(self, workbook_name, execution_id):
"""Stops the workflow execution with the given id.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id}
return self._client.call(cntx, 'stop_workflow_execution', **kwargs)
def convey_task_result(self, workbook_name, execution_id,
task_id, state, result):
"""Conveys task result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a task once task action has been performed. One of the
clients of this method is Mistral REST API server that receives
task result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:param state: New task state.
:param result: Task result data.
:return: Task.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id,
'task_id': task_id,
'state': state,
'result': result}
return self._client.call(cntx, 'convey_task_result', **kwargs)
def get_workflow_execution_state(self, workbook_name, execution_id):
"""Gets the workflow execution state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Current workflow state.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id}
return self._client.call(
cntx, 'get_workflow_execution_state', **kwargs)
def get_task_state(self, workbook_name, execution_id, task_id):
"""Gets task state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:return: Current task state.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'executioin_id': execution_id,
'task_id': task_id}
return self._client.call(cntx, 'get_task_state', **kwargs)

View File

@ -1,99 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Facade interface to Mistral Engine that provides control over lifecycle
of workflow executions.
"""
from mistral.openstack.common import importutils
from oslo.config import cfg
_engine = None
def load_engine(transport):
global _engine
module_name = cfg.CONF.engine.engine
module = importutils.import_module(module_name)
_engine = module.get_engine()
_engine.transport = transport
def start_workflow_execution(workbook_name, task_name, context=None):
"""Starts a workflow execution based on the specified workbook name
and target task.
:param workbook_name: Workbook name
:param task_name: Target task name
:param context: Execution context which defines a workflow input
:return: Workflow execution.
"""
return _engine.start_workflow_execution(workbook_name, task_name, context)
def stop_workflow_execution(workbook_name, execution_id):
"""Stops the workflow execution with the given id.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
return _engine.stop_workflow_execution(workbook_name, execution_id)
def convey_task_result(workbook_name, execution_id, task_id, state, result):
"""Conveys task result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a task once task action has been performed. One of the
clients of this method is Mistral REST API server that receives
task result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:param state: New task state.
:param result: Task result data.
:return: Task.
"""
return _engine.convey_task_result(workbook_name, execution_id, task_id,
state, result)
def get_workflow_execution_state(workbook_name, execution_id):
"""Gets the workflow execution state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Current workflow state.
"""
return _engine.get_workflow_execution_state(workbook_name, execution_id)
def get_task_state(workbook_name, execution_id, task_id):
"""Gets task state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:return: Current task state.
"""
return _engine.get_task_state(workbook_name, execution_id, task_id)

View File

@ -19,7 +19,8 @@ from oslo.config import cfg
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral import exceptions as exc
from mistral.engine import engine
from mistral import engine
from mistral.engine import client
from mistral.engine import states
from mistral.actions import action_factory as a_f
@ -30,6 +31,10 @@ WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
class Executor(object):
def __init__(self, transport=None):
self.transport = engine.get_transport(transport)
self.engine = client.EngineClient(self.transport)
def _do_task_action(self, task):
"""Executes the action defined by the task and return result.
@ -49,19 +54,19 @@ class Executor(object):
except exc.ActionException:
state, result = states.ERROR, None
engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
state, result)
self.engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
state, result)
else:
try:
action.run()
except exc.ActionException:
engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
states.ERROR, None)
self.engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
states.ERROR, None)
def _handle_task_error(self, task, exception):
"""Handle exception from the task execution.

View File

@ -14,9 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db import api as db_api
from mistral.engine import engine
from mistral import engine
from mistral.engine import client
from mistral.openstack.common import log
from mistral.openstack.common import periodic_task
from mistral.openstack.common import threadgroup
@ -25,10 +25,17 @@ from mistral import dsl_parser as parser
from mistral.services import scheduler as sched
from mistral.services import trusts
LOG = log.getLogger(__name__)
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self, transport=None):
super(MistralPeriodicTasks, self).__init__()
self.transport = engine.get_transport(transport)
self.engine = client.EngineClient(self.transport)
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def scheduler_triggers(self, ctx):
LOG.debug('Processing next Scheduler triggers.')
@ -40,15 +47,15 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
try:
task = parser.get_workbook(
wb['definition']).get_trigger_task_name(trigger['name'])
engine.start_workflow_execution(wb['name'], task)
self.engine.start_workflow_execution(wb['name'], task)
finally:
sched.set_next_execution_time(trigger)
context.set_ctx(None)
def setup():
def setup(transport):
tg = threadgroup.ThreadGroup()
pt = MistralPeriodicTasks()
pt = MistralPeriodicTasks(transport=transport)
tg.add_dynamic_timer(
pt.run_periodic_tasks,

View File

@ -21,7 +21,7 @@ from mistral import exceptions as ex
from webtest.app import AppError
from mistral.tests.api import base
from mistral.db import api as db_api
from mistral.engine import engine
from mistral.engine import client
# TODO: later we need additional tests verifying all the errors etc.
@ -75,7 +75,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_EXEC, canonize(resp.json))
@mock.patch.object(engine, 'start_workflow_execution',
@mock.patch.object(client.EngineClient, 'start_workflow_execution',
mock.MagicMock(return_value=EXECS[0]))
def test_post(self):
new_exec = EXECS[0].copy()
@ -86,7 +86,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(EXECS[0], canonize(resp.json))
@mock.patch.object(engine, 'start_workflow_execution',
@mock.patch.object(client.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=ex.MistralException))
def test_post_throws_exception(self):
with self.assertRaises(AppError) as context:

View File

@ -18,7 +18,7 @@ import mock
from mistral.tests.api import base
from mistral.db import api as db_api
from mistral.engine import engine
from mistral.engine import client
# TODO: later we need additional tests verifying all the errors etc.
@ -48,7 +48,7 @@ class TestTasksController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(TASKS[0], resp.json)
@mock.patch.object(engine, "convey_task_result",
@mock.patch.object(client.EngineClient, "convey_task_result",
mock.MagicMock(return_value=UPDATED_TASK))
def test_put(self):
resp = self.app.put_json(

View File

@ -32,11 +32,11 @@ importutils.import_module("mistral.config")
from mistral.db.sqlalchemy import api as db_api
from mistral.openstack.common import log as logging
from mistral.engine import engine
from mistral.engine.scalable.executor import server
from mistral.engine.scalable import engine as concrete_engine
from mistral.openstack.common.db.sqlalchemy import session
from mistral import version
from mistral.engine import client
from mistral.engine.scalable import engine
from mistral.engine.scalable.executor import server
RESOURCES_PATH = 'tests/resources/'
@ -120,12 +120,39 @@ class DbTestCase(BaseTest):
class EngineTestCase(DbTestCase):
transport = get_fake_transport()
def __init__(self, *args, **kwargs):
super(EngineTestCase, self).__init__(*args, **kwargs)
self.transport = get_fake_transport()
engine.load_engine(self.transport)
self.engine = concrete_engine.get_engine()
self.engine.transport = self.transport
self.engine = client.EngineClient(self.transport)
@classmethod
def mock_task_result(cls, workbook_name, execution_id,
task_id, state, result):
"""
Mock the engine convey_task_results to send request directly
to the engine instead of going through the oslo.messaging transport.
"""
return engine.ScalableEngine.convey_task_result(
workbook_name, execution_id, task_id, state, result)
@classmethod
def mock_start_workflow(cls, workbook_name, task_name, context=None):
"""
Mock the engine start_workflow_execution to send request directly
to the engine instead of going through the oslo.messaging transport.
"""
return engine.ScalableEngine.start_workflow_execution(
workbook_name, task_name, context)
@classmethod
def mock_get_workflow_state(cls, workbook_name, execution_id):
"""
Mock the engine get_workflow_execution_state to send request directly
to the engine instead of going through the oslo.messaging transport.
"""
return engine.ScalableEngine.get_workflow_execution_state(
workbook_name, execution_id)
@classmethod
def mock_run_tasks(cls, tasks):
@ -133,6 +160,15 @@ class EngineTestCase(DbTestCase):
Mock the engine _run_tasks to send requests directly to the task
executor instead of going through the oslo.messaging transport.
"""
executor = server.Executor()
executor = server.Executor(transport=cls.transport)
for task in tasks:
executor.handle_task({}, task=task)
@classmethod
def mock_handle_task(cls, cntx, **kwargs):
"""
Mock the executor handle_task to send requests directory to the task
executor instead of going through the oslo.messaging transport.
"""
executor = server.Executor(transport=cls.transport)
return executor.handle_task(cntx, **kwargs)

View File

@ -17,13 +17,14 @@ import mock
from oslo.config import cfg
from mistral.tests import base
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral.actions import std_actions
from mistral import expressions
from mistral.engine.scalable import engine
from mistral.engine import states
from mistral.tests import base
from mistral.engine import client
LOG = logging.getLogger(__name__)
@ -38,15 +39,23 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
#TODO(rakhmerov): add more tests for errors, execution stop etc.
@mock.patch.object(
client.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@mock.patch.object(
client.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
db_api, 'workbook_get',
mock.MagicMock(
return_value={'definition': base.get_resource('test_rest.yaml')}))
@mock.patch.object(
std_actions.HTTPAction, 'run',
mock.MagicMock(return_value={'state': states.SUCCESS}))
class TestScalableEngine(base.EngineTestCase):
@mock.patch.object(engine.ScalableEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(std_actions.HTTPAction, "run",
mock.MagicMock(return_value="result"))
@mock.patch.object(
engine.ScalableEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
def test_engine_one_task(self):
execution = self.engine.start_workflow_execution(WB_NAME, "create-vms",
CONTEXT)
@ -62,14 +71,13 @@ class TestScalableEngine(base.EngineTestCase):
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)
@mock.patch.object(engine.ScalableEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(std_actions.HTTPAction, "run",
mock.MagicMock(return_value="result"))
@mock.patch.object(
client.EngineClient, 'get_workflow_execution_state',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_get_workflow_state))
@mock.patch.object(
engine.ScalableEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
def test_engine_multiple_tasks(self):
execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms",
CONTEXT)
@ -107,19 +115,14 @@ class TestScalableEngine(base.EngineTestCase):
self.engine.get_workflow_execution_state(
WB_NAME, execution['id']))
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(std_actions.HTTPAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(states, "get_state_by_http_status_code",
mock.MagicMock(return_value=states.SUCCESS))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
states, "get_state_by_http_status_code",
mock.MagicMock(return_value=states.SUCCESS))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_sync_task(self):
execution = self.engine.start_workflow_execution(WB_NAME,
"create-vm-nova",
@ -131,17 +134,11 @@ class TestScalableEngine(base.EngineTestCase):
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(std_actions.HTTPAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_tasks_on_success_finish(self):
# Start workflow.
execution = self.engine.start_workflow_execution(WB_NAME,
@ -203,17 +200,11 @@ class TestScalableEngine(base.EngineTestCase):
self.assertEqual(execution['state'], states.SUCCESS)
self._assert_multiple_items(tasks, 4, state=states.SUCCESS)
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource("test_rest.yaml")
}))
@mock.patch.object(std_actions.HTTPAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS}))
@mock.patch.object(expressions, "evaluate",
mock.MagicMock(side_effect=lambda x, y: x))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
def test_engine_tasks_on_error_finish(self):
# Start workflow.
execution = self.engine.start_workflow_execution(WB_NAME,

View File

@ -18,18 +18,31 @@ import eventlet
eventlet.monkey_patch()
import uuid
import time
import mock
from oslo.config import cfg
from mistral.tests import base
from mistral.cmd import launch
from mistral.openstack.common import log as logging
from mistral.openstack.common import importutils
from mistral.engine import states
from mistral.db import api as db_api
from mistral.actions import std_actions
from mistral.engine.scalable.executor import client
from mistral.engine import client as engine
from mistral.engine.scalable.executor import client as executor
# We need to make sure that all configuration properties are registered.
importutils.import_module("mistral.config")
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK_NAME = 'my_workbook'
TASK_NAME = 'my_task'
TASK_NAME = 'create-vms'
SAMPLE_WORKBOOK = {
'id': str(uuid.uuid4()),
@ -77,27 +90,19 @@ SAMPLE_CONTEXT = {
class TestExecutor(base.DbTestCase):
def mock_action_run(self):
std_actions.HTTPAction.run = mock.MagicMock(return_value={})
return std_actions.HTTPAction.run
def setUp(self):
super(TestExecutor, self).setUp()
# Run the Executor in the background.
def __init__(self, *args, **kwargs):
super(TestExecutor, self).__init__(*args, **kwargs)
self.transport = base.get_fake_transport()
self.ex_thread = eventlet.spawn(launch.launch_executor, self.transport)
def tearDown(self):
# Stop the Executor.
self.ex_thread.kill()
super(TestExecutor, self).tearDown()
@mock.patch.object(
executor.ExecutorClient, 'handle_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_handle_task))
@mock.patch.object(
std_actions.HTTPAction, 'run', mock.MagicMock(return_value={}))
@mock.patch.object(
engine.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
def test_handle_task(self):
# Mock HTTP action.
mock_rest_action = self.mock_action_run()
# Create a new workbook.
workbook = db_api.workbook_create(SAMPLE_WORKBOOK)
self.assertIsInstance(workbook, dict)
@ -116,28 +121,11 @@ class TestExecutor(base.DbTestCase):
self.assertIn('id', task)
# Send the task request to the Executor.
ex_client = client.ExecutorClient(self.transport)
ex_client = executor.ExecutorClient(self.transport)
ex_client.handle_task(SAMPLE_CONTEXT, task=task)
# Check task execution state. There is no timeout mechanism in
# unittest. There is an example to add a custom timeout decorator that
# can wrap this test function in another process and then manage the
# process time. However, it seems more straightforward to keep the
# loop finite.
for i in range(0, 50):
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
# Ensure the request reached the executor and the action has ran.
if db_task['state'] != states.IDLE:
# We have to wait sometime due to time interval between set
# task state to RUNNING and invocation action.run()
time.sleep(0.1)
mock_rest_action.assert_called_once_with()
self.assertIn(db_task['state'],
[states.RUNNING, states.SUCCESS, states.ERROR])
return
time.sleep(0.1)
# Task is not being processed. Throw an exception here.
raise Exception('Timed out waiting for task to be processed.')
# Check task execution state.
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
self.assertEqual(db_task['state'], states.SUCCESS)

View File

@ -25,6 +25,7 @@ from mistral.db import api as db_api
from mistral.engine.scalable import engine
from mistral.actions import std_actions
from mistral.engine import states
from mistral.engine import client
from mistral.utils.openstack import keystone
@ -60,6 +61,15 @@ def create_workbook(definition_path):
})
@mock.patch.object(
client.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@mock.patch.object(
client.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
class DataFlowTest(base.EngineTestCase):
def _check_in_context_execution(self, task):
self.assertIn('__execution', task['in_context'])
@ -70,9 +80,6 @@ class DataFlowTest(base.EngineTestCase):
self.assertEqual(task['execution_id'], exec_dict['id'])
self.assertIn('task', exec_dict)
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_two_dependent_tasks(self):
CTX = copy.copy(CONTEXT)
@ -141,9 +148,6 @@ class DataFlowTest(base.EngineTestCase):
del build_greeting_task['in_context']['__execution']
self.assertDictEqual(CTX, build_greeting_task['in_context'])
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_task_with_two_dependencies(self):
CTX = copy.copy(CONTEXT)
@ -240,9 +244,6 @@ class DataFlowTest(base.EngineTestCase):
},
send_greeting_task['output'])
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_two_subsequent_tasks(self):
CTX = copy.copy(CONTEXT)
@ -312,9 +313,6 @@ class DataFlowTest(base.EngineTestCase):
del build_greeting_task['in_context']['__execution']
self.assertDictEqual(CTX, build_greeting_task['in_context'])
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
def test_three_subsequent_tasks(self):
CTX = copy.copy(CONTEXT)
@ -415,15 +413,13 @@ class DataFlowTest(base.EngineTestCase):
del send_greeting_task['in_context']['__execution']
self.assertDictEqual(CTX, send_greeting_task['in_context'])
@mock.patch.object(std_actions.HTTPAction, "run",
mock.MagicMock(return_value={'state': states.RUNNING}))
@mock.patch.object(keystone, "client_for_trusts",
mock.Mock(
return_value=mock.MagicMock(user_id=USER_ID,
auth_token=TOKEN)))
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
std_actions.HTTPAction, 'run',
mock.MagicMock(return_value={'state': states.RUNNING}))
@mock.patch.object(
keystone, 'client_for_trusts',
mock.Mock(return_value=mock.MagicMock(user_id=USER_ID,
auth_token=TOKEN)))
def test_add_token_to_context(self):
task_name = "create-vms"

View File

@ -24,6 +24,7 @@ from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.db import api as db_api
from mistral.engine import client
from mistral.engine.scalable import engine
from mistral.actions import std_actions
from mistral.engine import states
@ -57,16 +58,23 @@ class FailBeforeSuccessMocker(object):
return "result"
@mock.patch.object(engine.ScalableEngine, '_run_tasks',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(db_api, "workbook_get",
mock.MagicMock(return_value={
'definition': base.get_resource(
"retry_task/retry_task.yaml")
}))
@mock.patch.object(std_actions.HTTPAction, "run",
mock.MagicMock(return_value="result"))
@mock.patch.object(
client.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@mock.patch.object(
client.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
db_api, 'workbook_get',
mock.MagicMock(
return_value={
'definition': base.get_resource('retry_task/retry_task.yaml')}))
@mock.patch.object(
std_actions.HTTPAction, 'run',
mock.MagicMock(return_value='result'))
class TaskRetryTest(base.EngineTestCase):
def test_no_retry(self):

View File

@ -0,0 +1,95 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
eventlet.monkey_patch()
import time
import mock
from oslo.config import cfg
from mistral.tests import base
from mistral.openstack.common import log as logging
from mistral.openstack.common import importutils
from mistral.cmd import launch
from mistral.engine import states
from mistral.db import api as db_api
from mistral.actions import std_actions
# We need to make sure that all configuration properties are registered.
importutils.import_module("mistral.config")
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WB_NAME = 'my_workbook'
CONTEXT = None # TODO(rakhmerov): Use a meaningful value.
class TestTransport(base.EngineTestCase):
def setUp(self):
super(TestTransport, self).setUp()
# Run the Engine and Executor in the background.
self.en_thread = eventlet.spawn(launch.launch_engine, self.transport)
self.ex_thread = eventlet.spawn(launch.launch_executor, self.transport)
def tearDown(self):
# Stop the Engine and the Executor.
self.en_thread.kill()
self.ex_thread.kill()
super(TestTransport, self).tearDown()
@mock.patch.object(
db_api, 'workbook_get',
mock.MagicMock(
return_value={'definition': base.get_resource('test_rest.yaml')}))
@mock.patch.object(
std_actions.HTTPAction, 'run', mock.MagicMock(return_value={}))
def test_transport(self):
"""
Test if engine request traversed through the oslo.messaging transport.
"""
execution = self.engine.start_workflow_execution(
WB_NAME, 'create-vms', CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0]
# Check task execution state. There is no timeout mechanism in
# unittest. There is an example to add a custom timeout decorator that
# can wrap this test function in another process and then manage the
# process time. However, it seems more straightforward to keep the
# loop finite.
for i in range(0, 50):
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
# Ensure the request reached the executor and the action has ran.
if db_task['state'] != states.IDLE:
# We have to wait sometime due to time interval between set
# task state to RUNNING and invocation action.run()
time.sleep(0.1)
self.assertIn(db_task['state'],
[states.RUNNING, states.SUCCESS, states.ERROR])
return
time.sleep(0.1)
# Task is not being processed. Throw an exception here.
raise Exception('Timed out waiting for task to be processed.')

View File

@ -1,18 +1,17 @@
Services:
MyRest:
type: REST_API
parameters:
baseUrl: http://localhost:8989/v1/
actions:
my-action:
parameters:
url: workbooks
method: GET
Namespaces:
MyRest:
actions:
my-action:
class: std.http
base-parameters:
url: http://localhost:8989/v1/workbooks
method: GET
output:
Workflow:
tasks:
my_task:
action: MyRest:my-action
tasks:
my_task:
action: MyRest.my-action
# events:
# my_event: