Various fixes related to end-to-end testing

* Fixed several problems in scalable engine
* Fixed engine interface
* Fixed several problems in REST API
* Changed task DB model
* Fixed DSL parser
* Added scripts for end-to-end testing

Change-Id: Icbcc46d11a6c687d0ae3aad339ca5f5134c50195
This commit is contained in:
Renat Akhmerov 2013-12-18 18:15:55 +07:00
parent cf4a016143
commit 50bebda734
26 changed files with 327 additions and 80 deletions

View File

@ -21,6 +21,7 @@ from mistral.api.controllers.v1 import task
from mistral.openstack.common import log as logging
from mistral.api.controllers import resource
from mistral.db import api as db_api
from mistral.engine import engine
LOG = logging.getLogger(__name__)
@ -73,7 +74,8 @@ class ExecutionsController(rest.RestController):
LOG.debug("Create listener [workbook_name=%s, execution=%s]" %
(workbook_name, execution))
values = db_api.execution_create(workbook_name, execution.to_dict())
values = engine.start_workflow_execution(execution.workbook_name,
execution.target_task)
return Execution.from_dict(values)

View File

@ -20,6 +20,7 @@ import wsmeext.pecan as wsme_pecan
from mistral.openstack.common import log as logging
from mistral.api.controllers import resource
from mistral.db import api as db_api
from mistral.engine import engine
LOG = logging.getLogger(__name__)
@ -63,8 +64,11 @@ class TasksController(rest.RestController):
"[workbook_name=%s, execution_id=%s, id=%s, task=%s]" %
(workbook_name, execution_id, id, task))
values = db_api.task_update(workbook_name, execution_id, id,
task.to_dict())
# TODO(rakhmerov): pass task result once it's implemented
values = engine.convey_task_result(workbook_name,
execution_id,
id,
task.state, None)
return Task.from_dict(values)

View File

@ -222,7 +222,8 @@ def event_update(event_id, values, session=None):
@to_dict
def get_next_events(time):
@session_aware()
def get_next_events(time, session=None):
query = model_query(m.Event)
query = query.filter(m.Event.next_execution_time < time)
query = query.order_by(m.Event.next_execution_time)

View File

@ -88,6 +88,7 @@ class Task(mb.MistralBase):
workbook_name = sa.Column(sa.String(80))
execution_id = sa.Column(sa.String(36))
description = sa.Column(sa.String())
action = sa.Column(st.JsonDictType())
task_dsl = sa.Column(st.JsonDictType())
service_dsl = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20))
tags = sa.Column(st.JsonListType())

View File

@ -50,7 +50,12 @@ class Parser(object):
return events
def get_tasks(self):
return self.doc["Workflow"]["tasks"]
tasks = self.doc["Workflow"]["tasks"]
for task_name, task_dsl in tasks.iteritems():
task_dsl["service_name"] = task_dsl["action"].split(':')[0]
return tasks
def get_action(self, task_action_name):
service_name = task_action_name.split(':')[0]

View File

@ -39,9 +39,9 @@ def start_workflow_execution(workbook_name, target_task_name):
:param workbook_name: Workbook name
:param target_task_name: Target task name
:return: Workflow execution identifier.
:return: Workflow execution.
"""
IMPL.start_workflow_execution(workbook_name, target_task_name)
return IMPL.start_workflow_execution(workbook_name, target_task_name)
def stop_workflow_execution(workbook_name, execution_id):
@ -49,8 +49,9 @@ def stop_workflow_execution(workbook_name, execution_id):
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
IMPL.stop_workflow_execution(workbook_name, execution_id)
return IMPL.stop_workflow_execution(workbook_name, execution_id)
def convey_task_result(workbook_name, execution_id, task_id, state, result):
@ -70,9 +71,10 @@ def convey_task_result(workbook_name, execution_id, task_id, state, result):
:param task_id: Task id.
:param state: New task state.
:param result: Task result data.
:return: Task.
"""
IMPL.convey_task_result(workbook_name, execution_id, task_id, state,
result)
return IMPL.convey_task_result(workbook_name, execution_id, task_id,
state, result)
def get_workflow_execution_state(workbook_name, execution_id):
@ -82,7 +84,7 @@ def get_workflow_execution_state(workbook_name, execution_id):
:param execution_id: Workflow execution id.
:return: Current workflow state.
"""
IMPL.get_workflow_execution_state(workbook_name, execution_id)
return IMPL.get_workflow_execution_state(workbook_name, execution_id)
def get_task_state(workbook_name, execution_id, task_id):
@ -93,4 +95,4 @@ def get_task_state(workbook_name, execution_id, task_id):
:param task_id: Task id.
:return: Current task state.
"""
IMPL.get_task_state(workbook_name, execution_id, task_id)
return IMPL.get_task_state(workbook_name, execution_id, task_id)

View File

@ -40,7 +40,7 @@ def _notify_task_executors(tasks):
creds)
conn = pika.BlockingConnection(params)
LOG.info("Connected to RabbitMQ server [params=%s]" % params)
LOG.debug("Connected to RabbitMQ server [params=%s]" % params)
try:
channel = conn.channel()
@ -58,7 +58,7 @@ def _notify_task_executors(tasks):
def start_workflow_execution(workbook_name, target_task_name):
wb = db_api.workbook_get(workbook_name)
wb_dsl = dsl.Parser(wb.definition)
wb_dsl = dsl.Parser(wb["definition"])
dsl_tasks = workflow.find_workflow_tasks(wb_dsl, target_task_name)
@ -76,12 +76,11 @@ def start_workflow_execution(workbook_name, target_task_name):
for dsl_task in dsl_tasks:
task = db_api.task_create(workbook_name, execution["id"], {
"workbook_name": workbook_name,
"execution_id": execution["id"],
"name": dsl_task["name"],
"action": wb_dsl.get_action(dsl_task["action"]),
"task_dsl": dsl_task,
"service_dsl": wb_dsl.get_service(dsl_task["service_name"]),
"state": states.IDLE,
"tags": dsl_task["tags"]
"tags": dsl_task.get("tags", None)
})
tasks.append(task)
@ -89,48 +88,62 @@ def start_workflow_execution(workbook_name, target_task_name):
_notify_task_executors(tasks)
db_api.commit_tx()
return execution
finally:
db_api.end_tx()
def stop_workflow_execution(workbook_name, execution_id):
db_api.execution_update(workbook_name, execution_id,
{"state": states.STOPPED})
return db_api.execution_update(workbook_name, execution_id,
{"state": states.STOPPED})
def convey_task_result(workbook_name, execution_id, task_id, state, result):
db_api.start_tx()
try:
# Update task state
#TODO(rakhmerov): validate state transition
# Update task state.
task = db_api.task_update(workbook_name, execution_id, task_id,
{"state": state, "result": result})
if task["state"] == states.ERROR:
db_api.execution_update(workbook_name, execution_id, {
execution = db_api.execution_update(workbook_name, execution_id, {
"state": states.ERROR
})
db_api.commit_tx()
return
LOG.info("Execution finished with error: %s" % execution)
return task
execution = db_api.execution_get(workbook_name, execution_id)
if states.is_stopped_or_finished(execution["state"]):
# The execution has finished or stopped temporarily.
db_api.commit_tx()
return
return task
# Determine what tasks need to be started.
tasks = db_api.tasks_get(workbook_name, execution_id)
if workflow.is_finished(tasks):
if workflow.is_success(tasks):
db_api.execution_update(workbook_name, execution_id, {
"state": states.SUCCESS
})
db_api.commit_tx()
return
LOG.info("Execution finished with success: %s" % execution)
return task
_notify_task_executors(workflow.find_tasks_to_start(tasks))
db_api.commit_tx()
return task
finally:
db_api.end_tx()
@ -139,7 +152,9 @@ def get_workflow_execution_state(workbook_name, execution_id):
execution = db_api.execution_get(workbook_name, execution_id)
if not execution:
raise exception.EngineException("Workflow execution not found.")
raise exception.EngineException("Workflow execution not found "
"[workbook_name=%s, execution_id=%s]"
% (workbook_name, execution_id))
return execution["state"]

View File

@ -15,6 +15,36 @@
# limitations under the License.
import requests
from mistral.engine.scalable.executor import action_types
import mistral.exceptions as exc
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def create_action(task):
action_type = task['service_dsl']['type']
action_name = task['task_dsl']['action'].split(':')[1]
action_params = task['service_dsl']['actions'][action_name]['parameters']
task_params = task['task_dsl'].get('parameters', None)
if action_type == action_types.REST_API:
url = task['service_dsl']['parameters']['baseUrl'] +\
action_params['url']
headers = {
'Mistral-Workbook-Name': task['workbook_name'],
'Mistral-Execution-Id': task['execution_id'],
'Mistral-Task-Id': task['id'],
}
return RestAction(url=url,
params=task_params,
method=action_params['method'],
headers=headers)
else:
raise exc.InvalidActionException("Action type is not supported: %s" %
action_type)
class BaseAction(object):
@ -22,7 +52,7 @@ class BaseAction(object):
pass
class RESTAction(BaseAction):
class RestAction(BaseAction):
def __init__(self, url, params={}, method="GET", headers=None):
self.url = url
self.params = params
@ -30,7 +60,13 @@ class RESTAction(BaseAction):
self.headers = headers
def do_action(self):
requests.request(self.method, self.url, params=self.params,
headers=self.headers)
LOG.info("Sending action HTTP request "
"[method=%s, url=%s, params=%s, headers=%s]" %
(self.method, self.url, self.params, self.headers))
resp = requests.request(self.method, self.url, params=self.params,
headers=self.headers)
LOG.info("Received HTTP response:\n%s\n%s" %
(resp.status_code, resp.content))
# TODO(rakhmerov): add other types of actions.

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Valid action types."""
REST_API = 'REST_API'
_ALL = [REST_API]
def is_valid(action_type):
return action_type in _ALL

View File

@ -14,22 +14,70 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import pika
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral.engine import states
from mistral.engine.scalable.executor import action as act
LOG = logging.getLogger(__name__)
def do_task_action(task):
LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" %
(task['id'], task['task_dsl']['action'], task['service_dsl']))
act.create_action(task).do_action()
def handle_task_error(task, exc):
try:
db_api.start_tx()
try:
db_api.execution_update(task['workbook_name'],
task['execution_id'],
{'state': states.ERROR})
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.ERROR})
db_api.commit_tx()
finally:
db_api.end_tx()
except Exception as e:
LOG.exception(e)
def handle_task(channel, method, properties, body):
channel.basic_ack(delivery_tag=method.delivery_tag)
LOG.info("Received a message from RabbitMQ: " + body)
#TODO(rakhmerov): implement task execution logic
# 1. Fetch task and execution state from DB
# 2. If execution is in "RUNNING" state and task state is "IDLE"
# then do task action (send a signal)
task = json.loads(body)
try:
LOG.info("Received a task from RabbitMQ: %s" % task)
db_task = db_api.task_get(task['workbook_name'],
task['execution_id'],
task['id'])
db_exec = db_api.execution_get(task['workbook_name'],
task['execution_id'])
if not db_exec or not db_task:
return
if db_exec['state'] != states.RUNNING or \
db_task['state'] != states.IDLE:
return
do_task_action(db_task)
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.RUNNING})
except Exception as exc:
LOG.exception(exc)
handle_task_error(task, exc)
def start(rabbit_opts):

View File

@ -44,11 +44,11 @@ def find_tasks_to_start(tasks):
def is_finished(tasks):
for task in tasks:
if not states.is_finished(task['state']):
return False
return all(states.is_finished(task['state']) for task in tasks)
return True
def is_success(tasks):
return all(task['state'] == states.SUCCESS for task in tasks)
def _get_subgraph(full_graph, task_name):

View File

@ -16,6 +16,10 @@
class DataAccessException(Exception):
def __init__(self, message=None):
super(Exception, self).__init__(message)
class InvalidActionException(Exception):
def __init__(self, message=None):
super(Exception, self).__init__(message)

View File

@ -1,13 +0,0 @@
from mistral import config
from mistral.engine.scalable import engine
from mistral.openstack.common import log as logging
config.parse_args()
logging.setup("mistral")
tasks = []
for i in range(1000000):
tasks.append({"id": i, "name": "task%s" % i, "execution_id": 1})
engine._notify_task_executors(tasks)

View File

@ -18,6 +18,7 @@ import mock
from mistral.tests.api import base
from mistral.db import api as db_api
from mistral.engine import engine
# TODO: later we need additional tests verifying all the errors etc.
@ -69,7 +70,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertDictEqual(updated_exec, resp.json)
def test_post(self):
db_api.execution_create = mock.MagicMock(return_value=EXECS[0])
engine.start_workflow_execution = mock.MagicMock(return_value=EXECS[0])
resp = self.app.post_json('/v1/workbooks/my_workbook/executions',
EXECS[0])

View File

@ -18,6 +18,7 @@ import mock
from mistral.tests.api import base
from mistral.db import api as db_api
from mistral.engine import engine
# TODO: later we need additional tests verifying all the errors etc.
@ -60,7 +61,7 @@ class TestTasksController(base.FunctionalTest):
updated_task = TASKS[0].copy()
updated_task['state'] = 'STOPPED'
db_api.task_update = mock.MagicMock(return_value=updated_task)
engine.convey_task_result = mock.MagicMock(return_value=updated_task)
resp = self.app.put_json(
'/v1/workbooks/my_workbook/executions/123/tasks/1',

View File

@ -10,35 +10,30 @@ Services:
method: GET
task-parameters:
flavor_id:
optional: false
image_id:
optional: false
backup-vm:
parameters:
url: url_for_backup
method: GET
task-parameters:
server_id:
optional: false
attach-volume:
parameters:
url: url_for_attach
method: GET
task-parameters:
size:
optional: false
mnt_path:
optional: false
format-volume:
parameters:
url: url_for_format
method: GET
task-parameters:
volume_id:
optional: false
server_id:
optional: false
Workflow:
tasks:
@ -49,28 +44,27 @@ Workflow:
flavor_id: 42
attach-volumes:
dependsOn: [create-vms]
action: Nova:attach-volume
parameters:
size:
optional: false
mnt_path:
optional: false
dependsOn: [create-vms]
format-volumes:
dependsOn: [attach-volumes]
action: Nova:format-volume
parameters:
server_id:
optional: false
dependsOn: [attach-volumes]
backup-vms:
dependsOn: [create-vms]
action: Nova:backup-vm
parameters:
server_id:
optional: false
dependsOn: [create-vms]
events:
create-vms:
type: periodic
tasks: create-vms
parameters:
cron-pattern: "* * * * *"
cron-pattern: "* * * * *"

View File

@ -217,6 +217,8 @@ TASKS = [
'name': u'my_task1',
'description': u'my description',
'dependencies': [u'my_task2', u'my_task3'],
"task_dsl": None,
"service_dsl": None,
"action": {u'name': u'Nova:create-vm'},
"state": u'IDLE',
"tags": [u'deployment'],
@ -229,6 +231,8 @@ TASKS = [
'name': u'my_task2',
'description': u'my description',
'dependencies': [u'my_task4', u'my_task5'],
"task_dsl": None,
"service_dsl": None,
"action": {u'name': u'Cinder:create-volume'},
"state": u'IDLE',
"tags": [u'deployment'],

View File

@ -133,7 +133,7 @@ class ResourceManager(object):
def _raise_api_exception(self, resp):
error_data = get_json(resp)
raise APIException(error_data["error_message"])
raise APIException(error_data["faultstring"])
def get_json(response):

View File

@ -42,7 +42,7 @@ class Client(object):
if "v2.0" in auth_url:
raise RuntimeError('Mistral support only v3 '
'kyestone api')
print keystone_client
keystone = keystone_client.Client(username=username,
password=api_key,
token=input_auth_token,
@ -70,7 +70,7 @@ class Client(object):
break
if not mistral_url:
mistral_url = "http://localhost:8386/v1.0"
mistral_url = "http://localhost:8989/v1"
self.http_client = httpclient.HTTPClient(mistral_url,
token,
project_id,

View File

@ -37,7 +37,8 @@ class TaskManager(base.ResourceManager):
'state': state
}
return self._update('/workbooks/%s/executions' % workbook_name, data)
return self._update('/workbooks/%s/executions/%s/tasks/%s' %
(workbook_name, execution_id, id), data)
def list(self, workbook_name, execution_id):
self._ensure_not_empty(workbook_name=workbook_name,

View File

@ -69,4 +69,5 @@ class WorkbookManager(base.ResourceManager):
def get_definition(self, name):
self._ensure_not_empty(name=name)
return self.client.http_client.get('/workbooks/%s/definition' % name)
return self.client.http_client.get('/workbooks/%s/definition'
% name).content

View File

@ -23,9 +23,10 @@ from mistralclient.api import client
class FakeResponse(object):
"""Fake response for testing Mistral Client."""
def __init__(self, status_code, json_values={}):
def __init__(self, status_code, json_values={}, content=None):
self.status_code = status_code
self.json_values = json_values
self.content = content
def json(self):
return self.json_values

View File

@ -114,7 +114,8 @@ class TestWorkbooks(base.BaseClientTest):
self.workbooks.upload_definition("my_workbook", WB_DEF)
def test_get_definition(self):
self._client.http_client.get = mock.MagicMock(return_value=WB_DEF)
self._client.http_client.get =\
mock.MagicMock(return_value=base.FakeResponse(200, None, WB_DEF))
text = self.workbooks.get_definition("my_workbook")

View File

@ -0,0 +1,57 @@
import sys
from mistralclient.api import client as cl
client = cl.Client(project_name="mistral",
mistral_url="http://localhost:8989/v1")
WB_NAME = "my_workbook"
def find_execution():
executions = client.executions.list(WB_NAME)
if len(executions) == 0:
return None
for e in executions:
if e.state == "RUNNING":
return e
return None
def find_task(execution_id):
tasks = client.tasks.list(WB_NAME, execution_id)
if len(tasks) == 0:
return None
for t in tasks:
if t.state == "RUNNING":
return t
return None
execution = find_execution()
if not execution:
print "Unable to find running executions."
sys.exit(0)
print "Updating execution: %s" % execution
task = find_task(execution.id)
if not task:
print "Unable to find running tasks for execution: %s" % execution
sys.exit(0)
print "Setting task to SUCCESS state: %s" % task
task = client.tasks.update(WB_NAME, execution.id, task.id, "SUCCESS")
print "Updated task: %s" % task
execution = client.executions.get(WB_NAME, task.execution_id)
print "Updated execution: %s" % execution

22
scripts/test.yaml Normal file
View File

@ -0,0 +1,22 @@
Services:
MyRest:
type: REST_API
parameters:
baseUrl: http://localhost:8989/v1/
actions:
my-action:
parameters:
url: workbooks
method: GET
Workflow:
tasks:
my_task:
action: MyRest:my-action
# events:
# my_event:
# type: periodic
# tasks: my_task
# parameters:
# cron-pattern: "* * * * *"

View File

@ -0,0 +1,33 @@
from mistralclient.api import client as cl
client = cl.Client(project_name="mistral",
mistral_url="http://localhost:8989/v1")
WB_NAME = "my_workbook"
TARGET_TASK = "my_task"
wb_list = client.workbooks.list()
wb = None
for item in wb_list:
if item.name == WB_NAME:
wb = item
break
if not wb:
wb = client.workbooks.create(WB_NAME,
description="My test workbook",
tags=["test"])
print "Created workbook: %s" % wb
with open("scripts/test.yaml") as definition_file:
definition = definition_file.read()
client.workbooks.upload_definition(WB_NAME, definition)
print "\nUploaded workbook:\n\"\n%s\"\n" %\
client.workbooks.get_definition(WB_NAME)
execution = client.executions.create(WB_NAME, TARGET_TASK)
print "execution: %s" % execution