Implement REST API v2.0

Change-Id: I6c46f3b691b0e721ca47b29f7a749338397d6dc6
Implements: blueprint mistral-rest-api-2.0
This commit is contained in:
Kirill Izotov 2014-08-28 16:09:17 +07:00
parent 026ac2f207
commit f25d099bcb
24 changed files with 1189 additions and 20 deletions

View File

@ -20,7 +20,8 @@ import pecan
from mistral.api import access_control
from mistral.api.hooks import engine
from mistral import context as ctx
from mistral.db.v1 import api as db_api
from mistral.db.v1 import api as db_api_v1
from mistral.db.v2 import api as db_api_v2
from mistral.services import periodic
from mistral.services import scheduler
@ -47,7 +48,8 @@ def setup_app(config=None, transport=None):
app_conf = dict(config.app)
db_api.setup_db()
db_api_v1.setup_db()
db_api_v2.setup_db()
# TODO(akuznetsov) move this to trigger scheduling to separate process
periodic.setup(transport)

View File

@ -20,6 +20,7 @@ import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.api.controllers.v1 import root as v1_root
from mistral.api.controllers.v2 import root as v2_root
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -50,14 +51,20 @@ class APIVersion(resource.Resource):
class RootController(object):
v1 = v1_root.Controller()
v2 = v2_root.Controller()
@wsme_pecan.wsexpose([APIVersion])
def index(self):
LOG.debug("Fetching API versions.")
host_url = '%s/%s' % (pecan.request.host_url, 'v1')
host_url_v1 = '%s/%s' % (pecan.request.host_url, 'v1')
api_v1 = APIVersion(id='v1.0',
status='CURRENT',
link=resource.Link(href=host_url, target='v1'))
status='SUPPORTED',
link=resource.Link(href=host_url_v1, target='v1'))
return [api_v1]
host_url_v2 = '%s/%s' % (pecan.request.host_url, 'v2')
api_v2 = APIVersion(id='v2.0',
status='CURRENT',
link=resource.Link(href=host_url_v2, target='v2'))
return [api_v1, api_v2]

View File

@ -32,10 +32,6 @@ class RootResource(resource.Resource):
uri = wtypes.text
# TODO(everyone): what else do we need here?
# TODO(everyone): we need to collect all the links from API v1.0
# and provide them.
class Controller(object):
"""API root controller for version 1."""

View File

View File

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from pecan import rest
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.db.v2 import api as db_api
from mistral.engine1 import rpc
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils
LOG = logging.getLogger(__name__)
class Execution(resource.Resource):
"""Execution resource."""
id = wtypes.text
workflow_name = wtypes.text
state = wtypes.text
# Context is a JSON object but since WSME doesn't support arbitrary
# dictionaries we have to use text type convert to json and back manually.
input = wtypes.text
output = wtypes.text
created_at = wtypes.text
updated_at = wtypes.text
def to_dict(self):
d = super(Execution, self).to_dict()
if d.get('input'):
d['input'] = json.loads(d['input'])
if d.get('output'):
d['output'] = json.loads(d['output'])
return d
@classmethod
def from_dict(cls, d):
e = cls()
for key, val in d.items():
if hasattr(e, key):
# Nonetype check for dictionary must be explicit
if key == 'input' or key == 'output' and val is not None:
val = json.dumps(val)
setattr(e, key, val)
setattr(e, 'workflow_name', d.get('wf_spec', {}).get('name'))
return e
@classmethod
def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow',
state='SUCCESS',
input='{}',
output='{}',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000')
class Executions(resource.Resource):
"""A collection of Execution resources."""
executions = [Execution]
@classmethod
def sample(cls):
return cls(executions=[Execution.sample()])
class ExecutionsController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Execution, wtypes.text)
def get(self, id):
"""Return the specified Execution."""
LOG.debug("Fetch execution [id=%s]" % id)
return Execution.from_dict(db_api.get_execution(id).to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Execution, wtypes.text, body=Execution)
def put(self, id, execution):
"""Update the specified Execution."""
LOG.debug("Update execution [id=%s, execution=%s]" %
(id, execution))
db_model = db_api.update_execution(id, execution.to_dict())
return Execution.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Execution, body=Execution, status_code=201)
def post(self, execution):
"""Create a new Execution."""
LOG.debug("Create execution [execution=%s]" % execution)
engine = rpc.get_engine_client()
result = engine.start_workflow(**execution.to_dict())
return Execution.from_dict(result)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, wtypes.text, status_code=204)
def delete(self, id):
"""Delete the specified Execution."""
LOG.debug("Delete execution [id=%s]" % id)
return db_api.delete_execution(id)
@wsme_pecan.wsexpose(Executions)
def get_all(self):
"""Return all Executions."""
LOG.debug("Fetch executions")
executions = [Execution.from_dict(db_model.to_dict())
for db_model in db_api.get_executions()]
return Executions(executions=executions)

View File

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pecan
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.api.controllers.v2 import execution
from mistral.api.controllers.v2 import task
from mistral.api.controllers.v2 import workbook
from mistral.api.controllers.v2 import workflow
class RootResource(resource.Resource):
"""Root resource for API version 2.
It references all other resources belonging to the API.
"""
uri = wtypes.text
# TODO(everyone): what else do we need here?
# TODO(everyone): we need to collect all the links from API v2.0
# and provide them.
class Controller(object):
"""API root controller for version 2."""
workbooks = workbook.WorkbooksController()
workflows = workflow.WorkflowsController()
executions = execution.ExecutionsController()
tasks = task.TasksController()
@wsme_pecan.wsexpose(RootResource)
def index(self):
return RootResource(uri='%s/%s' % (pecan.request.host_url, 'v2'))

View File

@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from pecan import rest
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.db.v2 import api as db_api
from mistral.engine1 import rpc
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils
from mistral.workflow import base as wf_base
from mistral.workflow import states
LOG = logging.getLogger(__name__)
class Task(resource.Resource):
"""Task resource."""
id = wtypes.text
name = wtypes.text
wf_name = wtypes.text # should probably change model's field name instead
execution_id = wtypes.text
state = wtypes.text
result = wtypes.text
parameters = wtypes.text
output = wtypes.text
created_at = wtypes.text
updated_at = wtypes.text
@classmethod
def from_dict(cls, d):
e = cls()
for key, val in d.items():
if hasattr(e, key):
# Nonetype check for dictionary must be explicit
if val is not None and (
key == 'parameters' or key == 'output'):
val = json.dumps(val)
setattr(e, key, val)
return e
@classmethod
def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000',
wf_name='book',
execution_id='123e4567-e89b-12d3-a456-426655440000',
name='task',
description='tell when you are done',
# TODO(everyone): replace with states.SUCCESS
state='SUCCESS',
tags=['foo', 'fee'],
parameters='{"first_name": "John", "last_name": "Doe"}',
output='{"task": {"build_greeting": '
'{"greeting": "Hello, John Doe!"}}}',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000')
class Tasks(resource.Resource):
"""A collection of tasks."""
tasks = [Task]
@classmethod
def sample(cls):
return cls(tasks=[Task.sample()])
class TasksController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Task, wtypes.text)
def get(self, id):
"""Return the specified task."""
LOG.debug("Fetch task [id=%s]" % id)
db_model = db_api.get_task(id)
return Task.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Task, wtypes.text, body=Task)
def put(self, id, task):
"""Update the specified task."""
LOG.debug("Update task [id=%s, task=%s]" % (id, task))
# Client must provide a valid json. It doesn't necessarily should be an
# object but it should be json complaint so strings have to be escaped.
try:
result = json.loads(task.result)
except ValueError:
raise exc.InvalidResultException()
if task.state == states.ERROR:
raw_result = wf_base.TaskResult(None, result)
else:
raw_result = wf_base.TaskResult(result)
engine = rpc.get_engine_client()
values = engine.on_task_result(id, raw_result)
return Task.from_dict(values)
@wsme_pecan.wsexpose(Tasks)
def get_all(self):
"""Return all tasks within the execution."""
LOG.debug("Fetch tasks")
tasks = [Task.from_dict(db_model.to_dict())
for db_model in db_api.get_tasks()]
return Tasks(tasks=tasks)

View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pecan import rest
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.db.v2 import api as db_api
from mistral.openstack.common import log as logging
from mistral.services import workbooks
from mistral.utils import rest_utils
LOG = logging.getLogger(__name__)
SCOPE_TYPES = wtypes.Enum(str, 'private', 'public')
class Workbook(resource.Resource):
"""Workbook resource."""
id = wtypes.text
name = wtypes.text
definition = wtypes.text
tags = [wtypes.text]
scope = SCOPE_TYPES
created_at = wtypes.text
updated_at = wtypes.text
@classmethod
def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000',
name='book',
definition='---',
tags=['large', 'expensive'],
scope='private',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000')
class Workbooks(resource.Resource):
"""A collection of Workbooks."""
workbooks = [Workbook]
@classmethod
def sample(cls):
return cls(workbooks=[Workbook.sample()])
class WorkbooksController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Workbook, wtypes.text)
def get(self, name):
"""Return the named workbook."""
LOG.debug("Fetch workbook [name=%s]" % name)
db_model = db_api.get_workbook(name)
return Workbook.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Workbook, wtypes.text, body=Workbook)
def put(self, name, workbook):
"""Update the named workbook."""
LOG.debug("Update workbook [name=%s, workbook=%s]" % (name, workbook))
db_model = workbooks.update_workbook_v2(name, workbook.to_dict())
return Workbook.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Workbook, body=Workbook, status_code=201)
def post(self, workbook):
"""Create a new workbook."""
LOG.debug("Create workbook [workbook=%s]" % workbook)
db_model = workbooks.create_workbook_v2(workbook.to_dict())
return Workbook.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, wtypes.text, status_code=204)
def delete(self, name):
"""Delete the named workbook."""
LOG.debug("Delete workbook [name=%s]" % name)
db_api.delete_workbook(name)
@wsme_pecan.wsexpose(Workbooks)
def get_all(self):
"""Return all workbooks.
Where project_id is the same as the requestor or
project_id is different but the scope is public.
"""
LOG.debug("Fetch workbooks.")
workbooks_list = [Workbook.from_dict(db_model.to_dict())
for db_model in db_api.get_workbooks()]
return Workbooks(workbooks=workbooks_list)

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pecan import rest
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.db.v2 import api as db_api
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils
LOG = logging.getLogger(__name__)
SCOPE_TYPES = wtypes.Enum(str, 'private', 'public')
class Workflow(resource.Resource):
"""Workflow resource."""
id = wtypes.text
name = wtypes.text
definition = wtypes.text
tags = [wtypes.text]
scope = SCOPE_TYPES
created_at = wtypes.text
updated_at = wtypes.text
@classmethod
def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000',
name='flow',
definition='---',
tags=['large', 'expensive'],
scope='private',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000')
class Workflows(resource.Resource):
"""A collection of Workflows."""
workflows = [Workflow]
@classmethod
def sample(cls):
return cls(workflows=[Workflow.sample()])
class WorkflowsController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Workflow, wtypes.text)
def get(self, name):
"""Return the named workflow."""
LOG.debug("Fetch workflow [name=%s]" % name)
db_model = db_api.get_workflow(name)
return Workflow.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Workflow, wtypes.text, body=Workflow)
def put(self, name, workflow):
"""Update the named workflow."""
LOG.debug("Update workflow [name=%s, workflow=%s]" % (name, workflow))
db_model = db_api.update_workflow(name, workflow.to_dict())
return Workflow.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Workflow, body=Workflow, status_code=201)
def post(self, workflow):
"""Create a new workflow."""
LOG.debug("Create workflow [workflow=%s]" % workflow)
db_model = db_api.create_workflow(workflow.to_dict())
return Workflow.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, wtypes.text, status_code=204)
def delete(self, name):
"""Delete the named workflow."""
LOG.debug("Delete workflow [name=%s]" % name)
db_api.delete_workflow(name)
@wsme_pecan.wsexpose(Workflows)
def get_all(self):
"""Return all workflows.
Where project_id is the same as the requestor or
project_id is different but the scope is public.
"""
LOG.debug("Fetch workflows.")
workflows_list = [Workflow.from_dict(db_model.to_dict())
for db_model in db_api.get_workflows()]
return Workflows(workflows=workflows_list)

View File

@ -43,6 +43,7 @@ from mistral import config
from mistral import context as ctx
from mistral import engine
from mistral.engine import executor
from mistral.engine1 import rpc
from mistral.openstack.common import log as logging
@ -57,7 +58,10 @@ def launch_executor(transport):
# Since engine and executor are tightly coupled, use the engine
# configuration to decide which executor to get.
endpoints = [executor.get_executor(cfg.CONF.engine.engine, transport)]
endpoints = [
executor.get_executor(cfg.CONF.engine.engine, transport),
rpc.get_executor_server()
]
server = messaging.get_rpc_server(
transport,
@ -77,7 +81,10 @@ def launch_engine(transport):
server=cfg.CONF.engine.host
)
endpoints = [engine.get_engine(cfg.CONF.engine.engine, transport)]
endpoints = [
engine.get_engine(cfg.CONF.engine.engine, transport),
rpc.get_engine_server()
]
server = messaging.get_rpc_server(
transport,

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import sqlalchemy as sa
from sqlalchemy.orm import relationship
@ -104,6 +105,16 @@ class Task(mb.MistralModelBase):
execution_id = sa.Column(sa.String(36), sa.ForeignKey('executions_v2.id'))
execution = relationship('Execution', backref="tasks", lazy='joined')
def to_dict(self):
d = super(Task, self).to_dict()
d['result'] = json.dumps(
d['output'].get('task', {}).get(d['name'], {})
if d['output'] else {}
)
return d
class DelayedCall(mb.MistralModelBase):
"""Contains info about delayed calls."""

View File

@ -77,7 +77,7 @@ def get_executor_server():
def get_executor_client():
global _EXECUTOR_CLIENT
if not _ENGINE_CLIENT:
if not _EXECUTOR_CLIENT:
_EXECUTOR_CLIENT = ExecutorClient(get_transport())
return _EXECUTOR_CLIENT
@ -185,17 +185,16 @@ class EngineClient(base.Engine):
serializer=serializer
)
def start_workflow(self, workflow_name, workflow_input, **params):
def start_workflow(self, workflow_name, workflow_input=None, **params):
"""Starts workflow sending a request to engine over RPC.
:return: Workflow execution.
"""
return self._client.call(
auth_ctx.ctx(),
'start_workflow',
workflow_name=workflow_name,
workflow_input=workflow_input,
workflow_input=workflow_input or {},
params=params
)

View File

@ -97,3 +97,8 @@ class DSLParsingException(MistralException):
class InvalidModelException(MistralException):
http_code = 400
message = "Wrong entity definition"
class InvalidResultException(MistralException):
http_code = 400
message = "Unable to parse result"

View File

@ -128,4 +128,4 @@ def _update_specification(values):
if 'definition' in values:
spec = spec_parser.get_workbook_spec_from_yaml(values['definition'])
values['spec'] = spec.to_dict()
values['spec'] = spec.to_dict()

View File

@ -28,6 +28,6 @@ class TestRootController(base.FunctionalTest):
data = jsonutils.loads(resp.body.decode())
self.assertEqual(data[0]['id'], 'v1.0')
self.assertEqual(data[0]['status'], 'CURRENT')
self.assertEqual(data[0]['status'], 'SUPPORTED')
self.assertEqual(data[0]['link'], {'href': 'http://localhost/v1',
'target': 'v1'})

View File

View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import mock
from webtest import app as webtest_app
from mistral.api.controllers.v2 import execution
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine1 import rpc
from mistral import exceptions as exc
from mistral.tests.api import base
from mistral.workflow import states
EXEC_DB = models.Execution(id='123',
wf_spec={'name': 'some'},
state=states.RUNNING,
input={},
output={},
created_at=datetime.datetime(1970, 1, 1),
updated_at=datetime.datetime(1970, 1, 1))
EXEC = {
'id': '123',
'input': '{}',
'output': '{}',
'state': 'RUNNING',
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00',
'workflow_name': 'some'
}
UPDATED_EXEC_DB = copy.copy(EXEC_DB)
UPDATED_EXEC_DB['state'] = 'STOPPED'
UPDATED_EXEC = copy.copy(EXEC)
UPDATED_EXEC['state'] = 'STOPPED'
MOCK_EXECUTION = mock.MagicMock(return_value=EXEC_DB)
MOCK_EXECUTIONS = mock.MagicMock(return_value=[EXEC_DB])
MOCK_UPDATED_EXECUTION = mock.MagicMock(return_value=UPDATED_EXEC_DB)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException())
class TestExecutionsController(base.FunctionalTest):
@mock.patch.object(db_api, 'get_execution', MOCK_EXECUTION)
def test_get(self):
resp = self.app.get('/v2/executions/123')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(EXEC, resp.json)
@mock.patch.object(db_api, 'get_execution', MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/executions/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, 'update_execution', MOCK_UPDATED_EXECUTION)
def test_put(self):
resp = self.app.put_json('/v2/executions/123', UPDATED_EXEC)
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_EXEC, resp.json)
@mock.patch.object(db_api, 'update_execution', MOCK_NOT_FOUND)
def test_put_not_found(self):
resp = self.app.put_json('/v2/executions/123', dict(state='STOPPED'),
expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(rpc.EngineClient, 'start_workflow')
def test_post(self, f):
f.return_value = EXEC_DB.to_dict()
resp = self.app.post_json('/v2/executions', EXEC)
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(EXEC, resp.json)
f.assert_called_once_with(**execution.Execution(**EXEC).to_dict())
@mock.patch.object(rpc.EngineClient, 'start_workflow', MOCK_ACTION_EXC)
def test_post_throws_exception(self):
context = self.assertRaises(webtest_app.AppError, self.app.post_json,
'/v2/executions',
EXEC)
self.assertIn('Bad response: 400', context.message)
@mock.patch.object(db_api, 'delete_execution', MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/executions/123')
self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, 'delete_execution', MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete('/v2/executions/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, 'get_executions', MOCK_EXECUTIONS)
def test_get_all(self):
resp = self.app.get('/v2/executions')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['executions']), 1)
self.assertDictEqual(EXEC, resp.json['executions'][0])
@mock.patch.object(db_api, 'get_executions', MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/executions')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['executions']), 0)

View File

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.openstack.common import jsonutils
from mistral.tests.api import base
class TestRootController(base.FunctionalTest):
def test_index(self):
resp = self.app.get('/', headers={'Accept': 'application/json'})
self.assertEqual(resp.status_int, 200)
data = jsonutils.loads(resp.body.decode())
self.assertEqual(data[1]['id'], 'v2.0')
self.assertEqual(data[1]['status'], 'CURRENT')
self.assertEqual(data[1]['link'], {'href': 'http://localhost/v2',
'target': 'v2'})

View File

@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import json
import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine1 import rpc
from mistral import exceptions as exc
from mistral.tests.api import base
from mistral.workflow import base as wf
from mistral.workflow import states
# TODO(everyone): later we need additional tests verifying all the errors etc.
TASK_DB = models.Task(id='123',
name='task',
wf_name='flow',
spec={},
action_spec={},
state=states.RUNNING,
tags=['a', 'b'],
in_context={},
parameters={},
output={},
runtime_context={},
execution_id='123',
created_at=datetime.datetime(1970, 1, 1),
updated_at=datetime.datetime(1970, 1, 1))
TASK = {
'id': '123',
'name': 'task',
'wf_name': 'flow',
'state': 'RUNNING',
'result': '{}',
'parameters': '{}',
'output': '{}',
'execution_id': '123',
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00'
}
UPDATED_TASK_DB = copy.copy(TASK_DB)
UPDATED_TASK_DB['state'] = 'SUCCESS'
UPDATED_TASK = copy.copy(TASK)
UPDATED_TASK['state'] = 'SUCCESS'
UPDATED_TASK_RES = wf.TaskResult(json.loads(UPDATED_TASK['result']))
ERROR_TASK_DB = copy.copy(TASK_DB)
ERROR_TASK_DB['state'] = 'ERROR'
ERROR_TASK = copy.copy(TASK)
ERROR_TASK['state'] = 'ERROR'
ERROR_TASK_RES = wf.TaskResult(None, json.loads(ERROR_TASK['result']))
BROKEN_TASK = copy.copy(TASK)
BROKEN_TASK['result'] = 'string not escaped'
MOCK_TASK = mock.MagicMock(return_value=TASK_DB)
MOCK_TASKS = mock.MagicMock(return_value=[TASK_DB])
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
class TestTasksController(base.FunctionalTest):
@mock.patch.object(db_api, 'get_task', MOCK_TASK)
def test_get(self):
resp = self.app.get('/v2/tasks/123')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(TASK, resp.json)
@mock.patch.object(db_api, 'get_task', MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/tasks/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(rpc.EngineClient, 'on_task_result')
def test_put(self, f):
f.return_value = UPDATED_TASK_DB.to_dict()
resp = self.app.put_json('/v2/tasks/123', UPDATED_TASK)
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_TASK, resp.json)
f.assert_called_once_with(UPDATED_TASK['id'], UPDATED_TASK_RES)
@mock.patch.object(rpc.EngineClient, 'on_task_result')
def test_put_error(self, f):
f.return_value = ERROR_TASK_DB.to_dict()
resp = self.app.put_json('/v2/tasks/123', ERROR_TASK)
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(ERROR_TASK, resp.json)
f.assert_called_once_with(ERROR_TASK['id'], ERROR_TASK_RES)
@mock.patch.object(rpc.EngineClient, 'on_task_result', MOCK_NOT_FOUND)
def test_put_no_task(self):
resp = self.app.put_json('/v2/tasks/123', UPDATED_TASK,
expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(rpc.EngineClient, 'on_task_result')
def test_put_bad_result(self, f):
resp = self.app.put_json('/v2/tasks/123', BROKEN_TASK,
expect_errors=True)
self.assertEqual(resp.status_int, 400)
@mock.patch.object(db_api, 'get_tasks', MOCK_TASKS)
def test_get_all(self):
resp = self.app.get('/v2/tasks')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['tasks']), 1)
self.assertDictEqual(TASK, resp.json['tasks'][0])
@mock.patch.object(db_api, 'get_tasks', MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/tasks')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['tasks']), 0)

View File

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import workbooks
from mistral.tests.api import base
WORKBOOK_DB = models.Workbook(id='123',
name='book',
definition='---',
tags=['deployment', 'demo'],
scope="public",
created_at=datetime.datetime(1970, 1, 1),
updated_at=datetime.datetime(1970, 1, 1))
WORKBOOK = {
'id': '123',
'name': 'book',
'definition': '---',
'tags': ['deployment', 'demo'],
'scope': 'public',
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00'
}
UPDATED_WORKBOOK_DB = copy.copy(WORKBOOK_DB)
UPDATED_WORKBOOK_DB['definition'] = '---\nVersion: 2.0'
UPDATED_WORKBOOK = copy.copy(WORKBOOK)
UPDATED_WORKBOOK['definition'] = '---\nVersion: 2.0'
MOCK_WORKBOOK = mock.MagicMock(return_value=WORKBOOK_DB)
MOCK_WORKBOOKS = mock.MagicMock(return_value=[WORKBOOK_DB])
MOCK_UPDATED_WORKBOOK = mock.MagicMock(return_value=UPDATED_WORKBOOK_DB)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
class TestWorkbooksController(base.FunctionalTest):
@mock.patch.object(db_api, "get_workbook", MOCK_WORKBOOK)
def test_get(self):
resp = self.app.get('/v2/workbooks/123')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(WORKBOOK, resp.json)
@mock.patch.object(db_api, "get_workbook", MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/workbooks/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(workbooks, "update_workbook_v2", MOCK_UPDATED_WORKBOOK)
def test_put(self):
resp = self.app.put_json('/v2/workbooks/123', UPDATED_WORKBOOK)
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_WORKBOOK, resp.json)
@mock.patch.object(workbooks, "update_workbook_v2", MOCK_NOT_FOUND)
def test_put_not_found(self):
resp = self.app.put_json('/v2/workbooks/123', UPDATED_WORKBOOK,
expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(workbooks, "create_workbook_v2", MOCK_WORKBOOK)
def test_post(self):
resp = self.app.post_json('/v2/workbooks', WORKBOOK)
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(WORKBOOK, resp.json)
@mock.patch.object(workbooks, "create_workbook_v2", MOCK_DUPLICATE)
def test_post_dup(self):
resp = self.app.post_json('/v2/workbooks', WORKBOOK,
expect_errors=True)
self.assertEqual(resp.status_int, 409)
@mock.patch.object(db_api, "delete_workbook", MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/workbooks/123')
self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, "delete_workbook", MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete('/v2/workbooks/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "get_workbooks", MOCK_WORKBOOKS)
def test_get_all(self):
resp = self.app.get('/v2/workbooks')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['workbooks']), 1)
self.assertDictEqual(WORKBOOK, resp.json['workbooks'][0])
@mock.patch.object(db_api, "get_workbooks", MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/workbooks')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['workbooks']), 0)

View File

@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.tests.api import base
WORKFLOW_DB = models.Workflow(id='123',
name='flow',
definition='---',
tags=['deployment', 'demo'],
scope="public",
created_at=datetime.datetime(1970, 1, 1),
updated_at=datetime.datetime(1970, 1, 1))
WORKFLOW = {
'id': '123',
'name': 'flow',
'definition': '---',
'tags': ['deployment', 'demo'],
'scope': 'public',
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00'
}
UPDATED_WORKFLOW_DB = copy.copy(WORKFLOW_DB)
UPDATED_WORKFLOW_DB['definition'] = '---\nVersion: 2.0'
UPDATED_WORKFLOW = copy.copy(WORKFLOW)
UPDATED_WORKFLOW['definition'] = '---\nVersion: 2.0'
MOCK_WORKFLOW = mock.MagicMock(return_value=WORKFLOW_DB)
MOCK_WORKFLOWS = mock.MagicMock(return_value=[WORKFLOW_DB])
MOCK_UPDATED_WORKFLOW = mock.MagicMock(return_value=UPDATED_WORKFLOW_DB)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
class TestWorkflowsController(base.FunctionalTest):
@mock.patch.object(db_api, "get_workflow", MOCK_WORKFLOW)
def test_get(self):
resp = self.app.get('/v2/workflows/123')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(WORKFLOW, resp.json)
@mock.patch.object(db_api, "get_workflow", MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/workflows/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "update_workflow", MOCK_UPDATED_WORKFLOW)
def test_put(self):
resp = self.app.put_json('/v2/workflows/123', UPDATED_WORKFLOW)
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_WORKFLOW, resp.json)
@mock.patch.object(db_api, "update_workflow", MOCK_NOT_FOUND)
def test_put_not_found(self):
resp = self.app.put_json('/v2/workflows/123', UPDATED_WORKFLOW,
expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "create_workflow", MOCK_WORKFLOW)
def test_post(self):
resp = self.app.post_json('/v2/workflows', WORKFLOW)
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(WORKFLOW, resp.json)
@mock.patch.object(db_api, "create_workflow", MOCK_DUPLICATE)
def test_post_dup(self):
resp = self.app.post_json('/v2/workflows', WORKFLOW,
expect_errors=True)
self.assertEqual(resp.status_int, 409)
@mock.patch.object(db_api, "delete_workflow", MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/workflows/123')
self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, "delete_workflow", MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete('/v2/workflows/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "get_workflows", MOCK_WORKFLOWS)
def test_get_all(self):
resp = self.app.get('/v2/workflows')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['workflows']), 1)
self.assertDictEqual(WORKFLOW, resp.json['workflows'][0])
@mock.patch.object(db_api, "get_workflows", MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/workflows')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['workflows']), 0)

View File

@ -29,6 +29,7 @@ import time
from mistral import context as auth_context
from mistral.db.sqlalchemy import base as db_sa_base
from mistral.db.v1 import api as db_api_v1
from mistral.db.v2 import api as db_api_v2
from mistral import engine
from mistral.engine import executor
from mistral.openstack.common import log as logging
@ -173,6 +174,7 @@ class DbTestCase(BaseTest):
cfg.CONF.set_default('connection', 'sqlite://', group='database')
db_api_v1.setup_db()
db_api_v2.setup_db()
self.addCleanup(db_api_v1.drop_db)

View File

@ -54,7 +54,7 @@ def _get_spec_version(spec_dict):
elif 'version' in spec_dict:
ver = spec_dict['version']
if ver not in ALL_VERSIONS:
if str(ver) not in ALL_VERSIONS:
raise exc.DSLParsingException('Unsupported DSL version: %s' % ver)
return ver

View File

@ -164,7 +164,8 @@ class TaskResult(object):
self.error = error
def __repr__(self):
return 'TaskResult [data=%s, error=%s]' % (self.data, self.error)
return 'TaskResult [data=%s, error=%s]' % \
(repr(self.data), repr(self.error))
def is_error(self):
return self.error is not None
@ -172,6 +173,9 @@ class TaskResult(object):
def is_success(self):
return not self.is_error()
def __eq__(self, other):
return self.data == other.data and self.error == other.error
class FlowControl(object):
"""Flow control structure.