Refactor the local engine to use an in process executor

Refactored launch script to start the API and executor on the same process
so the transport for a oslo.messaging fake driver can be shared. A transport
attribute is added to the abstract engine as a class attribute to reference
the transport object.

Change-Id: I4b8684ebded9eb993749f2503380e078087edd7d
Implements: blueprint mistral-inproc-executor
This commit is contained in:
Winson Chan 2014-03-20 16:26:56 -07:00
parent 025343c050
commit e9a0f14db2
11 changed files with 125 additions and 134 deletions

View File

@ -17,24 +17,26 @@ This will install necessary virtual environments and run all the project tests.
### Running Mistral API server
To run Mistral API server perform the following command in a shell:
*tox -evenv -- python mistral/cmd/api.py --config-file path_to_config*
*tox -evenv -- python mistral/cmd/launch.py --server api --config-file path_to_config*
Note that an example configuration file can be found in etc/mistral.conf.example.
### Running Mistral Task Executors
To run Mistral Task Executor instance perform the following command in a shell::
To run Mistral Task Executor instance perform the following command in a shell:
*tox -evenv -- python mistral/cmd/task_executor.py --config-file path_to_config*
*tox -evenv -- python mistral/cmd/launch.py --server executor --config-file path_to_config*
Note that at least one Executor instance should be running so that workflow tasks are processed by Mistral.
### Debugging
To debug the engine, create etc/mistral.conf with the settings::
[engine]
engine = mistral.engine.local.engine
To debug using a local engine and executor without dependencies such as RabbitMQ, create etc/mistral.conf with the following settings::
[DEFAULT]
rpc_backend = fake
[pecan]
auth_enable = False
and run in pdb, PyDev or PyCharm::
mistral/cmd/api --config-file etc/mistral.conf --use-debugger
mistral/cmd/launch.py --server all --config-file etc/mistral.conf --use-debugger

View File

@ -14,6 +14,12 @@ default_log_levels = mistral=INFO,mistral.cmd.api=INFO,mistral.api=DEBUG,wsme=DE
# Uncomment this option to get more fine-grained control over logging configuration
#log_config_append = etc/logging.conf
# Options for oslo.messaging
#rpc_backend=rabbit
# Specifies which mistral server to start by the launch script. (string value)
#server=all
[api]
# Host and port to bind the API server to
host = 0.0.0.0
@ -46,4 +52,15 @@ auth_port=5000
admin_user=admin
admin_password=password
auth_protocol=http
admin_tenant_name=admin
admin_tenant_name=admin
[executor]
# Name of the executor node. This can be an opaque identifier.
# It is not necessarily a hostname, FQDN, or IP address. (string value)
#host=0.0.0.0
# The message topic that the executor listens on. (string value)
#topic=executor
# The version of the executor. (string value)
#version=1.0

View File

@ -41,14 +41,14 @@ def get_pecan_config():
return pecan.configuration.conf_from_dict(cfg_dict)
def setup_app(config=None):
def setup_app(config=None, transport=None):
if not config:
config = get_pecan_config()
app_conf = dict(config.app)
db_api.setup_db()
engine.load_engine()
engine.load_engine(transport)
##TODO(akuznetsov) move this to trigger scheduling to separate process
periodic.setup()

View File

@ -1,62 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Script to start Mistral API service."""
import eventlet
import os
import sys
from wsgiref import simple_server
from oslo.config import cfg
from mistral.api import app
from mistral import config
from mistral.openstack.common import log as logging
eventlet.monkey_patch(
os=True,
select=True,
socket=True,
thread=False if '--use-debugger' in sys.argv else True,
time=True)
LOG = logging.getLogger('mistral.cmd.api')
def main():
try:
config.parse_args()
logging.setup('Mistral')
host = cfg.CONF.api.host
port = cfg.CONF.api.port
server = simple_server.make_server(host, port, app.setup_app())
LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" %
(host, port, os.getpid()))
server.serve_forever()
except RuntimeError, e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -1,8 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -15,14 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Script to start instance of Task Executor."""
import sys
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(
os=True,
select=True,
socket=True,
thread=False if '--use-debugger' in sys.argv else True,
time=True)
import os
import sys
# If ../mistral/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
@ -32,17 +33,51 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from oslo import messaging
from oslo.config import cfg
from mistral import config
from mistral.engine import engine
from mistral.engine.scalable.executor import server
from mistral.api import app
from wsgiref import simple_server
from mistral.openstack.common import log as logging
LOG = logging.getLogger('mistral.cmd.task_executor')
LOG = logging.getLogger(__name__)
def launch_executor(transport):
# TODO(rakhmerov): This is a temporary hack.
# We have to initialize engine in executor process because
# executor now calls engine.convey_task_result() directly.
engine.load_engine(transport)
target = messaging.Target(topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host)
endpoints = [server.Executor()]
ex_server = messaging.get_rpc_server(transport, target, endpoints)
ex_server.start()
ex_server.wait()
def launch_api(transport):
host = cfg.CONF.api.host
port = cfg.CONF.api.port
server = simple_server.make_server(host, port,
app.setup_app(transport=transport))
LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" %
(host, port, os.getpid()))
server.serve_forever()
def launch_all(transport):
# Launch the servers on different threads.
t1 = eventlet.spawn(launch_executor, transport)
t2 = eventlet.spawn(launch_api, transport)
t1.wait()
t2.wait()
def main():
@ -50,10 +85,13 @@ def main():
config.parse_args()
logging.setup('Mistral')
# TODO(rakhmerov): This is a temporary hack.
# We have to initialize engine in executor process because
# executor now calls engine.convey_task_result() directly.
engine.load_engine()
# Map cli options to appropriate functions. The cli options are
# registered in mistral's config.py.
launch_options = {
'all': launch_all,
'api': launch_api,
'executor': launch_executor
}
# Please refer to the oslo.messaging documentation for transport
# configuration. The default transport for oslo.messaging is rabbitMQ.
@ -68,13 +106,10 @@ def main():
# that can be specified depending on the driver. Please refer to the
# driver implementation for those additional options.
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host)
endpoints = [server.Executor()]
ex_server = messaging.get_rpc_server(transport, target, endpoints)
ex_server.start()
ex_server.wait()
# Launch server(s).
launch_options[cfg.CONF.server](transport)
except RuntimeError, e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)

View File

@ -78,6 +78,13 @@ executor_opts = [
help='The version of the executor.')
]
launch_opt = cfg.StrOpt(
'server',
default='all',
choices=('all', 'api', 'executor'),
help='Specifies which mistral server to start by the launch script.'
)
CONF = cfg.CONF
CONF.register_opts(api_opts, group='api')
@ -89,6 +96,7 @@ CONF.register_opts(rabbit_opts, group='rabbit')
CONF.register_opts(executor_opts, group='executor')
CONF.register_cli_opt(use_debugger)
CONF.register_cli_opt(launch_opt)
CONF.import_opt('verbose', 'mistral.openstack.common.log')
CONF.import_opt('debug', 'mistral.openstack.common.log')

View File

@ -31,6 +31,8 @@ LOG = logging.getLogger(__name__)
class AbstractEngine(object):
transport = None
@classmethod
@abc.abstractmethod
def _run_tasks(cls, tasks):

View File

@ -25,11 +25,12 @@ from oslo.config import cfg
_engine = None
def load_engine():
def load_engine(transport):
global _engine
module_name = cfg.CONF.engine.engine
module = importutils.import_module(module_name)
_engine = module.get_engine()
_engine.transport = transport
def start_workflow_execution(workbook_name, task_name, context=None):

View File

@ -28,8 +28,9 @@ class ScalableEngine(abs_eng.AbstractEngine):
@classmethod
def _notify_task_executors(cls, tasks):
# TODO(m4dcoder): Use a pool for transport and client
transport = messaging.get_transport(cfg.CONF)
ex_client = client.ExecutorClient(transport)
if not cls.transport:
cls.transport = messaging.get_transport(cfg.CONF)
ex_client = client.ExecutorClient(cls.transport)
for task in tasks:
# TODO(m4dcoder): Fill request context argument with auth info
context = {}

View File

@ -23,6 +23,11 @@ from mistral import version
from mistral.db.sqlalchemy import api as db_api
from mistral.openstack.common.db.sqlalchemy import session
from stevedore import driver
from oslo.config import cfg
from oslo import messaging
from oslo.messaging import transport
RESOURCES_PATH = 'tests/resources/'
@ -33,6 +38,23 @@ def get_resource(resource_name):
RESOURCES_PATH + resource_name)).read()
def get_fake_transport():
# Get transport here to let oslo.messaging setup default config
# before changing the rpc_backend to the fake driver; otherwise,
# oslo.messaging will throw exception.
messaging.get_transport(cfg.CONF)
cfg.CONF.set_default('rpc_backend', 'fake')
url = transport.TransportURL.parse(cfg.CONF, None, None)
kwargs = dict(default_exchange=cfg.CONF.control_exchange,
allowed_remote_exmods=[])
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport,
invoke_on_load=True,
invoke_args=[cfg.CONF, url],
invoke_kwds=kwargs)
return transport.Transport(mgr.driver)
class BaseTest(unittest2.TestCase):
def setUp(self):
super(BaseTest, self).setUp()

View File

@ -20,15 +20,12 @@ import uuid
import time
import mock
from oslo import messaging
from oslo.config import cfg
from mistral.tests import base
from mistral.cmd import launch
from mistral.engine import states
from mistral.db import api as db_api
from mistral.engine.actions import actions
from mistral.engine.actions import action_types
from mistral.engine.scalable.executor import server
from mistral.engine.scalable.executor import client
@ -86,51 +83,20 @@ SAMPLE_CONTEXT = {
class TestExecutor(base.DbTestCase):
def get_transport(self):
# Get transport manually, oslo.messaging get_transport seems broken.
from stevedore import driver
from oslo.messaging import transport
# Get transport here to let oslo.messaging setup default config before
# changing the rpc_backend to the fake driver; otherwise,
# oslo.messaging will throw exception.
messaging.get_transport(cfg.CONF)
cfg.CONF.set_default('rpc_backend', 'fake')
url = transport.TransportURL.parse(cfg.CONF, None, None)
kwargs = dict(default_exchange=cfg.CONF.control_exchange,
allowed_remote_exmods=[])
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport,
invoke_on_load=True,
invoke_args=[cfg.CONF, url],
invoke_kwds=kwargs)
return transport.Transport(mgr.driver)
def mock_action_run(self):
actions.RestAction.run = mock.MagicMock(return_value={})
return actions.RestAction.run
def setUp(self):
# Initialize configuration for the ExecutorClient.
super(TestExecutor, self).setUp()
if not 'executor' in cfg.CONF:
cfg_grp = cfg.OptGroup(name='executor', title='Executor options')
opts = [cfg.StrOpt('host', default='0.0.0.0'),
cfg.StrOpt('topic', default='executor')]
cfg.CONF.register_group(cfg_grp)
cfg.CONF.register_opts(opts, group=cfg_grp)
# Start the Executor.
transport = self.get_transport()
target = messaging.Target(topic='executor', server='0.0.0.0')
endpoints = [server.Executor()]
self.server = messaging.get_rpc_server(transport, target,
endpoints, executor='eventlet')
self.server.start()
# Run the Executor in the background.
self.transport = base.get_fake_transport()
self.ex_thread = eventlet.spawn(launch.launch_executor, self.transport)
def tearDown(self):
# Stop the Executor.
if self.server:
self.server.stop()
self.ex_thread.kill()
super(TestExecutor, self).tearDown()
@ -156,8 +122,7 @@ class TestExecutor(base.DbTestCase):
self.assertIn('id', task)
# Send the task request to the Executor.
transport = self.server.transport
ex_client = client.ExecutorClient(transport)
ex_client = client.ExecutorClient(self.transport)
ex_client.handle_task(SAMPLE_CONTEXT, task=task)
# Check task execution state. There is no timeout mechanism in