Use oslo.messaging for AMQP communications

Ported task executor to oslo.messaging. The executor module is
replaced with RPC server/client named Executor and ExecutorClient
respectively to handle task execution. The old executor module is
deleted.  The engine is modified to use the ExecutorClient to send
task requests over the transport to the Executor. The launcher is
modified to start the new Executor.

Change-Id: Ibce01813e51c2220c45e05bb820b4729027446a3
Implements: blueprint mistral-oslo-messaging
This commit is contained in:
Winson Chan 2014-03-03 09:51:46 -08:00
parent 69a6b9bf28
commit 791f6f14f7
9 changed files with 407 additions and 180 deletions

View File

@ -17,11 +17,28 @@
"""Script to start instance of Task Executor."""
import eventlet
eventlet.monkey_patch()
import os
import sys
# If ../mistral/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from oslo import messaging
from oslo.config import cfg
from mistral import config
from mistral.engine.scalable.executor import server
from mistral.openstack.common import log as logging
from mistral.engine.scalable.executor import executor
LOG = logging.getLogger('mistral.cmd.task_executor')
@ -31,15 +48,26 @@ def main():
config.parse_args()
logging.setup('Mistral')
rabbit_opts = cfg.CONF.rabbit
# Please refer to the oslo.messaging documentation for transport
# configuration. The default transport for oslo.messaging is rabbitMQ.
# The available transport drivers are listed under oslo.messaging at
# ./oslo/messaging/rpc/_drivers. The drivers are prefixed with "impl".
# The transport driver is specified using the rpc_backend option in the
# default section of the oslo configuration file. The expected value
# for rpc_backend is the last part of the driver name. For example,
# the driver for rabbit is impl_rabbit and for the fake driver is
# impl_fake. The rpc_backend value for these are "rabbit" and "fake"
# respectively. There are additional options such as ssl and credential
# that can be specified depending on the driver. Please refer to the
# driver implementation for those additional options.
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host)
endpoints = [server.Executor()]
executor.start(rabbit_opts)
LOG.info("Mistral Task Executor is listening RabbitMQ"
" [host=%s, port=%s, task_queue=%s]" %
(rabbit_opts.rabbit_host,
rabbit_opts.rabbit_port,
rabbit_opts.rabbit_task_queue))
ex_server = messaging.get_rpc_server(transport, target, endpoints)
ex_server.start()
ex_server.wait()
except RuntimeError, e:
sys.stderr.write("ERROR: %s\n" % e)
sys.exit(1)

View File

@ -67,6 +67,17 @@ use_debugger = cfg.BoolOpt(
'Use at your own risk.'
)
executor_opts = [
cfg.StrOpt('host', default='0.0.0.0',
help='Name of the executor node. This can be an opaque '
'identifier. It is not necessarily a hostname, '
'FQDN, or IP address.'),
cfg.StrOpt('topic', default='executor',
help='The message topic that the executor listens on.'),
cfg.StrOpt('version', default='1.0',
help='The version of the executor.')
]
CONF = cfg.CONF
CONF.register_opts(api_opts, group='api')
@ -75,6 +86,7 @@ CONF.register_opts(pecan_opts, group='pecan')
CONF.register_opts(auth_token.opts, group='keystone')
CONF.register_opts(db_opts, group='database')
CONF.register_opts(rabbit_opts, group='rabbit')
CONF.register_opts(executor_opts, group='executor')
CONF.register_cli_opt(use_debugger)

View File

@ -14,11 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import pika
from oslo import messaging
from oslo.config import cfg
from mistral.openstack.common import log as logging
from mistral.engine.scalable.executor import client
from mistral.engine import abstract_engine as abs_eng
@ -28,30 +27,14 @@ LOG = logging.getLogger(__name__)
class ScalableEngine(abs_eng.AbstractEngine):
@classmethod
def _notify_task_executors(cls, tasks):
opts = cfg.CONF.rabbit
creds = pika.PlainCredentials(opts.rabbit_user,
opts.rabbit_password)
params = pika.ConnectionParameters(opts.rabbit_host,
opts.rabbit_port,
opts.rabbit_virtual_host,
creds)
conn = pika.BlockingConnection(params)
LOG.debug("Connected to RabbitMQ server [params=%s]" % params)
try:
channel = conn.channel()
channel.queue_declare(queue=opts.rabbit_task_queue)
for task in tasks:
msg = json.dumps(task)
channel.basic_publish(exchange='',
routing_key=opts.rabbit_task_queue,
body=msg)
LOG.info("Submitted task for execution: '%s'" % msg)
finally:
conn.close()
# TODO(m4dcoder): Use a pool for transport and client
transport = messaging.get_transport(cfg.CONF)
ex_client = client.ExecutorClient(transport)
for task in tasks:
# TODO(m4dcoder): Fill request context argument with auth info
context = {}
ex_client.handle_task(context, task=task)
LOG.info("Submitted task for execution: '%s'" % task)
@classmethod
def _run_tasks(cls, tasks):

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo import messaging
from oslo.config import cfg
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class ExecutorClient(object):
"""
RPC client for the Executor.
"""
def __init__(self, transport):
"""Construct an RPC client for the Executor.
:param transport: a messaging transport handle
:type transport: Transport
"""
target = messaging.Target(topic=cfg.CONF.executor.topic)
self._client = messaging.RPCClient(transport, target)
def handle_task(self, cntx, **kwargs):
"""Send the task request to the Executor for execution.
:param cntx: a request context dict
:type cntx: dict
:param kwargs: a dict of method arguments
:type kwargs: dict
"""
return self._client.cast(cntx, 'handle_task', **kwargs)

View File

@ -1,136 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import pika
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral import exceptions as exc
from mistral.engine import engine
from mistral.engine import states
from mistral.engine.actions import action_factory as a_f
from mistral.engine.actions import action_helper as a_h
LOG = logging.getLogger(__name__)
def do_task_action(task):
LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" %
(task['id'], task['task_dsl']['action'], task['service_dsl']))
action = a_f.create_action(task)
if a_h.is_task_synchronous(task):
try:
state, result = states.SUCCESS, action.run()
except exc.ActionException:
state, result = states.ERROR, None
engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
state, result)
else:
try:
action.run()
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.RUNNING})
except exc.ActionException:
engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
states.ERROR, None)
def handle_task_error(task, exception):
try:
db_api.start_tx()
try:
db_api.execution_update(task['workbook_name'],
task['execution_id'],
{'state': states.ERROR})
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.ERROR})
db_api.commit_tx()
finally:
db_api.end_tx()
except Exception as e:
LOG.exception(e)
def handle_task(channel, method, properties, body):
channel.basic_ack(delivery_tag=method.delivery_tag)
task = json.loads(body)
try:
LOG.info("Received a task from RabbitMQ: %s" % task)
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
db_exec = db_api.execution_get(task['workbook_name'],
task['execution_id'])
if not db_exec or not db_task:
return
if db_exec['state'] != states.RUNNING or \
db_task['state'] != states.IDLE:
return
do_task_action(db_task)
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.RUNNING})
except Exception as exc:
LOG.exception(exc)
handle_task_error(task, exc)
def start(rabbit_opts):
opts = rabbit_opts
creds = pika.PlainCredentials(opts.rabbit_user,
opts.rabbit_password)
params = pika.ConnectionParameters(opts.rabbit_host,
opts.rabbit_port,
opts.rabbit_virtual_host,
creds)
conn = pika.BlockingConnection(params)
LOG.info("Connected to RabbitMQ server [params=%s]" % params)
try:
channel = conn.channel()
channel.queue_declare(queue=opts.rabbit_task_queue)
LOG.info("Waiting for task messages...")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(handle_task,
queue=opts.rabbit_task_queue,
no_ack=False)
channel.start_consuming()
finally:
conn.close()

View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral import exceptions as exc
from mistral.engine import engine
from mistral.engine import states
from mistral.engine.actions import action_factory as a_f
from mistral.engine.actions import action_helper as a_h
LOG = logging.getLogger(__name__)
class Executor(object):
def _do_task_action(self, task):
"""Executes the action defined by the task and return result.
:param task: a task definition
:type task: dict
"""
LOG.info("Starting task action [task_id=%s, "
"action='%s', service='%s'" %
(task['id'], task['task_dsl']['action'],
task['service_dsl']))
action = a_f.create_action(task)
if a_h.is_task_synchronous(task):
try:
state, result = states.SUCCESS, action.run()
except exc.ActionException:
state, result = states.ERROR, None
engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
state, result)
else:
try:
action.run()
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.RUNNING})
except exc.ActionException:
engine.convey_task_result(task['workbook_name'],
task['execution_id'],
task['id'],
states.ERROR, None)
def _handle_task_error(self, task, exception):
"""Handle exception from the task execution.
:param task: the task corresponding to the exception
:type task: dict
:param exception: an exception thrown during the execution of the task
:type exception: Exception
"""
try:
db_api.start_tx()
try:
db_api.execution_update(task['workbook_name'],
task['execution_id'],
{'state': states.ERROR})
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.ERROR})
db_api.commit_tx()
finally:
db_api.end_tx()
except Exception as e:
LOG.exception(e)
def handle_task(self, cntx, **kwargs):
"""Handle the execution of the workbook task.
:param cntx: a request context dict
:type cntx: dict
:param kwargs: a dict of method arguments
:type kwargs: dict
"""
try:
task = kwargs.get('task', None)
if not task:
raise Exception('No task is provided to the executor.')
LOG.info("Received a task: %s" % task)
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
db_exec = db_api.execution_get(task['workbook_name'],
task['execution_id'])
if not db_exec or not db_task:
return
if db_exec['state'] != states.RUNNING or \
db_task['state'] != states.IDLE:
return
self._do_task_action(db_task)
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.RUNNING})
except Exception as exc:
LOG.exception(exc)
self._handle_task_error(task, exc)

View File

@ -13,17 +13,168 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
eventlet.monkey_patch()
import uuid
import time
import mock
from oslo import messaging
from oslo.config import cfg
from mistral.tests import base
from mistral.engine import states
from mistral.db import api as db_api
from mistral.engine.actions import actions
from mistral.engine.actions import action_types
from mistral.engine.scalable.executor import server
from mistral.engine.scalable.executor import client
class TestTaskExecutor(base.DbTestCase):
WORKBOOK_NAME = 'my_workbook'
TASK_NAME = 'my_task'
SAMPLE_WORKBOOK = {
'id': str(uuid.uuid4()),
'name': WORKBOOK_NAME,
'description': 'my description',
'definition': '{}',
'tags': [],
'scope': 'public',
'updated_at': None,
'project_id': '123',
'trust_id': '1234'
}
SAMPLE_EXECUTION = {
'id': str(uuid.uuid4()),
'workbook_name': WORKBOOK_NAME,
'task': TASK_NAME,
'state': states.RUNNING,
'updated_at': None,
'context': None
}
SAMPLE_TASK = {
'name': TASK_NAME,
'workbook_name': WORKBOOK_NAME,
'service_dsl': {
'type': action_types.REST_API,
'parameters': {
'baseUrl': 'http://localhost:8989/v1/'},
'actions': {
'my-action': {
'parameters': {
'url': 'workbooks',
'method': 'GET'}}}},
'task_dsl': {
'action': 'MyRest:my-action',
'service_name': 'MyRest',
'name': TASK_NAME},
'requires': {},
'state': states.IDLE}
SAMPLE_CONTEXT = {
'user': 'admin',
'tenant': 'mistral'
}
class TestExecutor(base.DbTestCase):
def get_transport(self):
# Get transport manually, oslo.messaging get_transport seems broken.
from stevedore import driver
from oslo.messaging import transport
# Get transport here to let oslo.messaging setup default config before
# changing the rpc_backend to the fake driver; otherwise,
# oslo.messaging will throw exception.
messaging.get_transport(cfg.CONF)
cfg.CONF.set_default('rpc_backend', 'fake')
url = transport.TransportURL.parse(cfg.CONF, None, None)
kwargs = dict(default_exchange=cfg.CONF.control_exchange,
allowed_remote_exmods=[])
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport,
invoke_on_load=True,
invoke_args=[cfg.CONF, url],
invoke_kwds=kwargs)
return transport.Transport(mgr.driver)
def mock_action_run(self):
actions.RestAction.run = mock.MagicMock(return_value={})
return actions.RestAction.run
def setUp(self):
super(TestTaskExecutor, self).setUp()
self.wb_name = "my_workbook"
# Initialize configuration for the ExecutorClient.
if not 'executor' in cfg.CONF:
cfg_grp = cfg.OptGroup(name='executor', title='Executor options')
opts = [cfg.StrOpt('host', default='0.0.0.0'),
cfg.StrOpt('topic', default='executor')]
cfg.CONF.register_group(cfg_grp)
cfg.CONF.register_opts(opts, group=cfg_grp)
# Start the Executor.
transport = self.get_transport()
target = messaging.Target(topic='executor', server='0.0.0.0')
endpoints = [server.Executor()]
self.server = messaging.get_rpc_server(transport, target,
endpoints, executor='eventlet')
self.server.start()
super(TestExecutor, self).setUp()
def tearDown(self):
super(TestTaskExecutor, self).tearDown()
# Stop the Executor.
if self.server:
self.server.stop()
super(TestExecutor, self).tearDown()
def test_handle_task(self):
#TODO(rakhmerov): need to mock out required MQ stuff and test the rest
pass
# Mock the RestAction
mock_rest_action = self.mock_action_run()
# Create a new workbook.
workbook = db_api.workbook_create(SAMPLE_WORKBOOK)
self.assertIsInstance(workbook, dict)
# Create a new execution.
execution = db_api.execution_create(SAMPLE_EXECUTION['workbook_name'],
SAMPLE_EXECUTION)
self.assertIsInstance(execution, dict)
# Create a new task.
SAMPLE_TASK['execution_id'] = execution['id']
task = db_api.task_create(SAMPLE_TASK['workbook_name'],
SAMPLE_TASK['execution_id'],
SAMPLE_TASK)
self.assertIsInstance(task, dict)
self.assertIn('id', task)
# Send the task request to the Executor.
transport = self.server.transport
ex_client = client.ExecutorClient(transport)
ex_client.handle_task(SAMPLE_CONTEXT, task=task)
# Check task execution state. There is no timeout mechanism in
# unittest. There is an example to add a custom timeout decorator that
# can wrap this test function in another process and then manage the
# process time. However, it seems more straightforward to keep the
# loop finite.
for i in range(0, 50):
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
# Ensure the request reached the executor and the action has ran.
if db_task['state'] != states.IDLE:
mock_rest_action.assert_called_once_with()
self.assertIn(db_task['state'],
[states.RUNNING, states.SUCCESS, states.ERROR])
return
time.sleep(0.01)
# Task is not being processed. Throw an exception here.
raise Exception('Timed out waiting for task to be processed.')

View File

@ -7,9 +7,11 @@ amqplib>=0.6.1
argparse
croniter
requests
kombu>=2.4.8
oslo.config>=1.2.0
oslo.messaging>=1.3.0a4
python-keystoneclient>=0.3.2
pika>=0.9.13
networkx
six>=1.5.2
SQLAlchemy
yaql==0.2.1

View File

@ -15,10 +15,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from mistralclient.api import client as cl
client = cl.Client(project_name="mistral",
mistral_url="http://localhost:8989/v1")
mistral_url="http://localhost:8989/v1",
username="admin",
api_key="secrete",
auth_url="http://localhost:5000/v3")
WB_NAME = "my_workbook"
TASK = "my_task"
@ -48,3 +53,11 @@ print "\nUploaded workbook:\n\"\n%s\"\n" %\
execution = client.executions.create(WB_NAME, TASK)
print "execution: %s" % execution
# wait until task is complete
for i in range(0, 20):
execution = client.executions.get(WB_NAME, execution.id)
print "execution: %s" % execution
if execution.state == 'SUCCESS':
break
time.sleep(1)