Add pagination support for workflows query API
Add query params for workflow list REST API: * limit: return a maximun number of items at a time, default is None, the query result will include all the resource items, which is backward compatible. * marker: the ID of the last item in the previous list. * sort_keys: columns to sort results by. Default: created_at. * sort_dirs: directions to sort corresponding to sort_keys, "asc" or "desc" can be choosed. Default: asc. The length of sort_dirs can be equal or less than that of sort_keys. Change-Id: Ie73d4457193999555ce9886d4de1297b4d0bc51d Partially-Implements: blueprint mistral-query-enhancement
This commit is contained in:
parent
3326affb89
commit
dba860a15d
@ -1,5 +1,3 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
#
|
|
||||||
# Copyright 2013 - Mirantis, Inc.
|
# Copyright 2013 - Mirantis, Inc.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
@ -67,6 +65,40 @@ class Resource(wtypes.Base):
|
|||||||
class ResourceList(Resource):
|
class ResourceList(Resource):
|
||||||
"""Resource containing the list of other resources."""
|
"""Resource containing the list of other resources."""
|
||||||
|
|
||||||
|
next = wtypes.text
|
||||||
|
"""A link to retrieve the next subset of the resource list"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def collection(self):
|
||||||
|
return getattr(self, self._type)
|
||||||
|
|
||||||
|
def has_next(self, limit):
|
||||||
|
"""Return whether resources has more items."""
|
||||||
|
return len(self.collection) and len(self.collection) == limit
|
||||||
|
|
||||||
|
def get_next(self, limit, url=None, **kwargs):
|
||||||
|
"""Return a link to the next subset of the resources."""
|
||||||
|
if not self.has_next(limit):
|
||||||
|
return wtypes.Unset
|
||||||
|
|
||||||
|
q_args = ''.join(
|
||||||
|
['%s=%s&' % (key, value) for key, value in kwargs.items()]
|
||||||
|
)
|
||||||
|
|
||||||
|
resource_args = '?%(args)slimit=%(limit)d&marker=%(marker)s' % {
|
||||||
|
'args': q_args,
|
||||||
|
'limit': limit,
|
||||||
|
'marker': self.collection[-1].id
|
||||||
|
}
|
||||||
|
|
||||||
|
next_link = "%(host_url)s/v2/%(resource)s%(args)s" % {
|
||||||
|
'host_url': url,
|
||||||
|
'resource': self._type,
|
||||||
|
'args': resource_args
|
||||||
|
}
|
||||||
|
|
||||||
|
return next_link
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
d = {}
|
d = {}
|
||||||
|
|
||||||
|
99
mistral/api/controllers/v2/types.py
Normal file
99
mistral/api/controllers/v2/types.py
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
# Copyright 2015 Huawei Technologies Co., Ltd.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
import six
|
||||||
|
from wsme import types as wtypes
|
||||||
|
|
||||||
|
from mistral import exceptions as exc
|
||||||
|
|
||||||
|
|
||||||
|
class ListType(wtypes.UserType):
|
||||||
|
"""A simple list type."""
|
||||||
|
|
||||||
|
basetype = wtypes.text
|
||||||
|
name = 'list'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate(value):
|
||||||
|
"""Validate and convert the input to a ListType.
|
||||||
|
|
||||||
|
:param value: A comma separated string of values
|
||||||
|
:returns: A list of values.
|
||||||
|
"""
|
||||||
|
items = [v.strip().lower() for v in six.text_type(value).split(',')]
|
||||||
|
|
||||||
|
# filter() to remove empty items.
|
||||||
|
return filter(None, items)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def frombasetype(value):
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return ListType.validate(value)
|
||||||
|
|
||||||
|
|
||||||
|
class UniqueListType(ListType):
|
||||||
|
"""A simple list type with no duplicate items."""
|
||||||
|
|
||||||
|
name = 'uniquelist'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate(value):
|
||||||
|
"""Validate and convert the input to a UniqueListType.
|
||||||
|
|
||||||
|
:param value: A comma separated string of values.
|
||||||
|
:returns: A list with no duplicate items.
|
||||||
|
"""
|
||||||
|
items = ListType.validate(value)
|
||||||
|
|
||||||
|
seen = set()
|
||||||
|
|
||||||
|
return [x for x in items if not (x in seen or seen.add(x))]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def frombasetype(value):
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return UniqueListType.validate(value)
|
||||||
|
|
||||||
|
|
||||||
|
class UuidType(wtypes.UserType):
|
||||||
|
"""A simple UUID type.
|
||||||
|
|
||||||
|
The builtin UuidType class in wsme.types doesn't work properly with pecan.
|
||||||
|
"""
|
||||||
|
|
||||||
|
basetype = wtypes.text
|
||||||
|
name = 'uuid'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate(value):
|
||||||
|
if not uuidutils.is_uuid_like(value):
|
||||||
|
raise exc.InputException(
|
||||||
|
"Expected a uuid but received %s." % value
|
||||||
|
)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def frombasetype(value):
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return UuidType.validate(value)
|
||||||
|
|
||||||
|
|
||||||
|
uuid = UuidType()
|
||||||
|
list = ListType()
|
||||||
|
uniquelist = UniqueListType()
|
@ -1,5 +1,6 @@
|
|||||||
# Copyright 2013 - Mirantis, Inc.
|
# Copyright 2013 - Mirantis, Inc.
|
||||||
# Copyright 2015 - StackStorm, Inc.
|
# Copyright 2015 - StackStorm, Inc.
|
||||||
|
# Copyright 2015 Huawei Technologies Co., Ltd.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -21,6 +22,7 @@ from wsme import types as wtypes
|
|||||||
import wsmeext.pecan as wsme_pecan
|
import wsmeext.pecan as wsme_pecan
|
||||||
|
|
||||||
from mistral.api.controllers import resource
|
from mistral.api.controllers import resource
|
||||||
|
from mistral.api.controllers.v2 import types
|
||||||
from mistral.api.controllers.v2 import validation
|
from mistral.api.controllers.v2 import validation
|
||||||
from mistral.api.hooks import content_type as ct_hook
|
from mistral.api.hooks import content_type as ct_hook
|
||||||
from mistral.db.v2 import api as db_api
|
from mistral.db.v2 import api as db_api
|
||||||
@ -88,9 +90,29 @@ class Workflows(resource.ResourceList):
|
|||||||
|
|
||||||
workflows = [Workflow]
|
workflows = [Workflow]
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self._type = 'workflows'
|
||||||
|
|
||||||
|
super(Workflows, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def convert_with_links(workflows, limit, url=None, **kwargs):
|
||||||
|
wf_collection = Workflows()
|
||||||
|
wf_collection.workflows = workflows
|
||||||
|
wf_collection.next = wf_collection.get_next(limit, url=url, **kwargs)
|
||||||
|
|
||||||
|
return wf_collection
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def sample(cls):
|
def sample(cls):
|
||||||
return cls(workflows=[Workflow.sample()])
|
workflows_sample = cls()
|
||||||
|
workflows_sample.workflows = [Workflow.sample()]
|
||||||
|
workflows_sample.next = "http://localhost:8989/v2/workflows?" \
|
||||||
|
"sort_keys=id,name&" \
|
||||||
|
"sort_dirs=asc,desc&limit=10&" \
|
||||||
|
"marker=123e4567-e89b-12d3-a456-426655440000"
|
||||||
|
|
||||||
|
return workflows_sample
|
||||||
|
|
||||||
|
|
||||||
class WorkflowsController(rest.RestController, hooks.HookController):
|
class WorkflowsController(rest.RestController, hooks.HookController):
|
||||||
@ -159,16 +181,50 @@ class WorkflowsController(rest.RestController, hooks.HookController):
|
|||||||
|
|
||||||
db_api.delete_workflow_definition(name)
|
db_api.delete_workflow_definition(name)
|
||||||
|
|
||||||
@wsme_pecan.wsexpose(Workflows)
|
@rest_utils.wrap_pecan_controller_exception
|
||||||
def get_all(self):
|
@wsme_pecan.wsexpose(Workflows, types.uuid, int, types.uniquelist,
|
||||||
"""Return all workflows.
|
types.list)
|
||||||
|
def get_all(self, marker=None, limit=None, sort_keys='created_at',
|
||||||
|
sort_dirs='asc'):
|
||||||
|
"""Return a list of workflows.
|
||||||
|
|
||||||
|
:param marker: Optional. Pagination marker for large data sets.
|
||||||
|
:param limit: Optional. Maximum number of resources to return in a
|
||||||
|
single result. Default value is None for backward
|
||||||
|
compatability.
|
||||||
|
:param sort_keys: Optional. Columns to sort results by.
|
||||||
|
Default: created_at.
|
||||||
|
:param sort_dirs: Optional. Directions to sort corresponding to
|
||||||
|
sort_keys, "asc" or "desc" can be choosed.
|
||||||
|
Default: asc.
|
||||||
|
|
||||||
Where project_id is the same as the requester or
|
Where project_id is the same as the requester or
|
||||||
project_id is different but the scope is public.
|
project_id is different but the scope is public.
|
||||||
"""
|
"""
|
||||||
LOG.info("Fetch workflows.")
|
LOG.info("Fetch workflows. marker=%s, limit=%s, sort_keys=%s, "
|
||||||
|
"sort_dirs=%s", marker, limit, sort_keys, sort_dirs)
|
||||||
|
|
||||||
|
rest_utils.validate_query_params(limit, sort_keys, sort_dirs)
|
||||||
|
|
||||||
|
marker_obj = None
|
||||||
|
|
||||||
|
if marker:
|
||||||
|
marker_obj = db_api.get_workflow_definition_by_id(marker)
|
||||||
|
|
||||||
|
db_workflows = db_api.get_workflow_definitions(
|
||||||
|
limit=limit,
|
||||||
|
marker=marker_obj,
|
||||||
|
sort_keys=sort_keys,
|
||||||
|
sort_dirs=sort_dirs
|
||||||
|
)
|
||||||
|
|
||||||
workflows_list = [Workflow.from_dict(db_model.to_dict())
|
workflows_list = [Workflow.from_dict(db_model.to_dict())
|
||||||
for db_model in db_api.get_workflow_definitions()]
|
for db_model in db_workflows]
|
||||||
|
|
||||||
return Workflows(workflows=workflows_list)
|
return Workflows.convert_with_links(
|
||||||
|
workflows_list,
|
||||||
|
limit,
|
||||||
|
pecan.request.host_url,
|
||||||
|
sort_keys=','.join(sort_keys),
|
||||||
|
sort_dirs=','.join(sort_dirs)
|
||||||
|
)
|
||||||
|
@ -107,13 +107,26 @@ def get_workflow_definition(name):
|
|||||||
return IMPL.get_workflow_definition(name)
|
return IMPL.get_workflow_definition(name)
|
||||||
|
|
||||||
|
|
||||||
|
def get_workflow_definition_by_id(id):
|
||||||
|
return IMPL.get_workflow_definition_by_id(id)
|
||||||
|
|
||||||
|
|
||||||
def load_workflow_definition(name):
|
def load_workflow_definition(name):
|
||||||
"""Unlike get_workflow_definition this method is allowed to return None."""
|
"""Unlike get_workflow_definition this method is allowed to return None."""
|
||||||
return IMPL.load_workflow_definition(name)
|
return IMPL.load_workflow_definition(name)
|
||||||
|
|
||||||
|
|
||||||
def get_workflow_definitions():
|
# NOTE(xylan): We just leave filter param here for future usage
|
||||||
return IMPL.get_workflow_definitions()
|
def get_workflow_definitions(filters=None, limit=None, marker=None,
|
||||||
|
sort_keys=None, sort_dirs=None, **kwargs):
|
||||||
|
return IMPL.get_workflow_definitions(
|
||||||
|
filters=filters,
|
||||||
|
limit=limit,
|
||||||
|
marker=marker,
|
||||||
|
sort_keys=sort_keys,
|
||||||
|
sort_dirs=sort_dirs,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_workflow_definition(values):
|
def create_workflow_definition(values):
|
||||||
|
@ -19,6 +19,7 @@ import sys
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
from oslo_db import sqlalchemy as oslo_sqlalchemy
|
from oslo_db import sqlalchemy as oslo_sqlalchemy
|
||||||
|
from oslo_db.sqlalchemy import utils as db_utils
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
@ -115,6 +116,23 @@ def _secure_query(model):
|
|||||||
return query
|
return query
|
||||||
|
|
||||||
|
|
||||||
|
def _paginate_query(model, limit=None, marker=None, sort_keys=None,
|
||||||
|
sort_dirs=None, query=None):
|
||||||
|
if not query:
|
||||||
|
query = _secure_query(model)
|
||||||
|
|
||||||
|
query = db_utils.paginate_query(
|
||||||
|
query,
|
||||||
|
model,
|
||||||
|
limit,
|
||||||
|
sort_keys if sort_keys else {},
|
||||||
|
marker=marker,
|
||||||
|
sort_dirs=sort_dirs
|
||||||
|
)
|
||||||
|
|
||||||
|
return query.all()
|
||||||
|
|
||||||
|
|
||||||
def _delete_all(model, session=None, **kwargs):
|
def _delete_all(model, session=None, **kwargs):
|
||||||
_secure_query(model).filter_by(**kwargs).delete()
|
_secure_query(model).filter_by(**kwargs).delete()
|
||||||
|
|
||||||
@ -166,7 +184,7 @@ def create_workbook(values, session=None):
|
|||||||
try:
|
try:
|
||||||
wb.save(session=session)
|
wb.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for WorkbookDefinition: %s" % e.columns
|
"Duplicate entry for WorkbookDefinition: %s" % e.columns
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -216,6 +234,7 @@ def delete_workbooks(**kwargs):
|
|||||||
|
|
||||||
# Workflow definitions.
|
# Workflow definitions.
|
||||||
|
|
||||||
|
|
||||||
def get_workflow_definition(name):
|
def get_workflow_definition(name):
|
||||||
wf_def = _get_workflow_definition(name)
|
wf_def = _get_workflow_definition(name)
|
||||||
|
|
||||||
@ -227,12 +246,40 @@ def get_workflow_definition(name):
|
|||||||
return wf_def
|
return wf_def
|
||||||
|
|
||||||
|
|
||||||
|
def get_workflow_definition_by_id(id):
|
||||||
|
wf_def = _get_workflow_definition_by_id(id)
|
||||||
|
|
||||||
|
if not wf_def:
|
||||||
|
raise exc.NotFoundException(
|
||||||
|
"Workflow not found [workflow_id=%s]" % id
|
||||||
|
)
|
||||||
|
|
||||||
|
return wf_def
|
||||||
|
|
||||||
|
|
||||||
def load_workflow_definition(name):
|
def load_workflow_definition(name):
|
||||||
return _get_workflow_definition(name)
|
return _get_workflow_definition(name)
|
||||||
|
|
||||||
|
|
||||||
def get_workflow_definitions(**kwargs):
|
# NOTE(xylan): We just leave filter param here for future usage
|
||||||
return _get_collection_sorted_by_name(models.WorkflowDefinition, **kwargs)
|
def get_workflow_definitions(filters=None, limit=None, marker=None,
|
||||||
|
sort_keys=None, sort_dirs=None, **kwargs):
|
||||||
|
query = _secure_query(models.WorkflowDefinition)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return _paginate_query(
|
||||||
|
models.WorkflowDefinition,
|
||||||
|
limit,
|
||||||
|
marker,
|
||||||
|
sort_keys,
|
||||||
|
sort_dirs,
|
||||||
|
query
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise exc.DBQueryEntryException(
|
||||||
|
"Failed when quering database, error type: %s, "
|
||||||
|
"error message: %s" % (e.__class__.__name__, e.message)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@b.session_aware()
|
@b.session_aware()
|
||||||
@ -244,7 +291,7 @@ def create_workflow_definition(values, session=None):
|
|||||||
try:
|
try:
|
||||||
wf_def.save(session=session)
|
wf_def.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for WorkflowDefinition: %s" % e.columns
|
"Duplicate entry for WorkflowDefinition: %s" % e.columns
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -293,6 +340,10 @@ def _get_workflow_definition(name):
|
|||||||
return _get_db_object_by_name(models.WorkflowDefinition, name)
|
return _get_db_object_by_name(models.WorkflowDefinition, name)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_workflow_definition_by_id(id):
|
||||||
|
return _get_db_object_by_id(models.WorkflowDefinition, id)
|
||||||
|
|
||||||
|
|
||||||
# Action definitions.
|
# Action definitions.
|
||||||
|
|
||||||
def get_action_definition(name):
|
def get_action_definition(name):
|
||||||
@ -323,7 +374,7 @@ def create_action_definition(values, session=None):
|
|||||||
try:
|
try:
|
||||||
a_def.save(session=session)
|
a_def.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for action %s: %s" % (a_def.name, e.columns)
|
"Duplicate entry for action %s: %s" % (a_def.name, e.columns)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -406,7 +457,7 @@ def create_execution(values, session=None):
|
|||||||
try:
|
try:
|
||||||
ex.save(session=session)
|
ex.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for Execution: %s" % e.columns
|
"Duplicate entry for Execution: %s" % e.columns
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -492,7 +543,7 @@ def create_action_execution(values, session=None):
|
|||||||
try:
|
try:
|
||||||
a_ex.save(session=session)
|
a_ex.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for ActionExecution: %s" % e.columns
|
"Duplicate entry for ActionExecution: %s" % e.columns
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -578,7 +629,7 @@ def create_workflow_execution(values, session=None):
|
|||||||
try:
|
try:
|
||||||
wf_ex.save(session=session)
|
wf_ex.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for WorkflowExecution: %s" % e.columns
|
"Duplicate entry for WorkflowExecution: %s" % e.columns
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -656,7 +707,7 @@ def create_task_execution(values, session=None):
|
|||||||
try:
|
try:
|
||||||
task_ex.save(session=session)
|
task_ex.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for TaskExecution: %s" % e.columns
|
"Duplicate entry for TaskExecution: %s" % e.columns
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -716,8 +767,9 @@ def create_delayed_call(values, session=None):
|
|||||||
try:
|
try:
|
||||||
delayed_call.save(session)
|
delayed_call.save(session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry("Duplicate entry for DelayedCall: %s"
|
raise exc.DBDuplicateEntryException(
|
||||||
% e.columns)
|
"Duplicate entry for DelayedCall: %s" % e.columns
|
||||||
|
)
|
||||||
|
|
||||||
return delayed_call
|
return delayed_call
|
||||||
|
|
||||||
@ -846,14 +898,14 @@ def create_cron_trigger(values, session=None):
|
|||||||
try:
|
try:
|
||||||
cron_trigger.save(session=session)
|
cron_trigger.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for cron trigger %s: %s"
|
"Duplicate entry for cron trigger %s: %s"
|
||||||
% (cron_trigger.name, e.columns)
|
% (cron_trigger.name, e.columns)
|
||||||
)
|
)
|
||||||
# TODO(nmakhotkin): Remove this 'except' after fixing
|
# TODO(nmakhotkin): Remove this 'except' after fixing
|
||||||
# https://bugs.launchpad.net/oslo.db/+bug/1458583.
|
# https://bugs.launchpad.net/oslo.db/+bug/1458583.
|
||||||
except db_exc.DBError as e:
|
except db_exc.DBError as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for cron trigger: %s" % e
|
"Duplicate entry for cron trigger: %s" % e
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -935,7 +987,7 @@ def create_environment(values, session=None):
|
|||||||
try:
|
try:
|
||||||
env.save(session=session)
|
env.save(session=session)
|
||||||
except db_exc.DBDuplicateEntry as e:
|
except db_exc.DBDuplicateEntry as e:
|
||||||
raise exc.DBDuplicateEntry(
|
raise exc.DBDuplicateEntryException(
|
||||||
"Duplicate entry for Environment: %s" % e.columns
|
"Duplicate entry for Environment: %s" % e.columns
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ def validate_input(definition, input, spec=None):
|
|||||||
missing_param_names = []
|
missing_param_names = []
|
||||||
|
|
||||||
spec_input = (spec.get_input() if spec else
|
spec_input = (spec.get_input() if spec else
|
||||||
utils.get_input_dict_from_input_string(definition.input))
|
utils.get_dict_from_string(definition.input))
|
||||||
|
|
||||||
for p_name, p_value in six.iteritems(spec_input):
|
for p_name, p_value in six.iteritems(spec_input):
|
||||||
if p_value is utils.NotDefined and p_name not in input_param_names:
|
if p_value is utils.NotDefined and p_name not in input_param_names:
|
||||||
|
@ -59,11 +59,15 @@ class NotFoundException(MistralException):
|
|||||||
message = "Object not found"
|
message = "Object not found"
|
||||||
|
|
||||||
|
|
||||||
class DBDuplicateEntry(MistralException):
|
class DBDuplicateEntryException(MistralException):
|
||||||
http_code = 409
|
http_code = 409
|
||||||
message = "Database object already exists"
|
message = "Database object already exists"
|
||||||
|
|
||||||
|
|
||||||
|
class DBQueryEntryException(MistralException):
|
||||||
|
http_code = 400
|
||||||
|
|
||||||
|
|
||||||
class ActionException(MistralException):
|
class ActionException(MistralException):
|
||||||
http_code = 400
|
http_code = 400
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ def register_action_class(name, action_class_str, attributes,
|
|||||||
LOG.debug("Registering action in DB: %s" % name)
|
LOG.debug("Registering action in DB: %s" % name)
|
||||||
|
|
||||||
db_api.create_action_definition(values)
|
db_api.create_action_definition(values)
|
||||||
except exc.DBDuplicateEntry:
|
except exc.DBDuplicateEntryException:
|
||||||
LOG.debug("Action %s already exists in DB." % name)
|
LOG.debug("Action %s already exists in DB." % name)
|
||||||
|
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ MOCK_UPDATED_ACTION = mock.MagicMock(return_value=UPDATED_ACTION_DB)
|
|||||||
MOCK_DELETE = mock.MagicMock(return_value=None)
|
MOCK_DELETE = mock.MagicMock(return_value=None)
|
||||||
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
||||||
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
||||||
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
|
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
|
||||||
|
|
||||||
|
|
||||||
class TestActionsController(base.FunctionalTest):
|
class TestActionsController(base.FunctionalTest):
|
||||||
|
@ -71,7 +71,7 @@ MOCK_UPDATED_TRIGGER = mock.MagicMock(return_value=UPDATED_TRIGGER_DB)
|
|||||||
MOCK_DELETE = mock.MagicMock(return_value=None)
|
MOCK_DELETE = mock.MagicMock(return_value=None)
|
||||||
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
||||||
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
||||||
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
|
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
|
||||||
|
|
||||||
|
|
||||||
class TestCronTriggerController(base.FunctionalTest):
|
class TestCronTriggerController(base.FunctionalTest):
|
||||||
|
@ -108,7 +108,7 @@ MOCK_ENVIRONMENTS = mock.MagicMock(return_value=[ENVIRONMENT_DB])
|
|||||||
MOCK_UPDATED_ENVIRONMENT = mock.MagicMock(return_value=UPDATED_ENVIRONMENT_DB)
|
MOCK_UPDATED_ENVIRONMENT = mock.MagicMock(return_value=UPDATED_ENVIRONMENT_DB)
|
||||||
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
||||||
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
||||||
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
|
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
|
||||||
MOCK_DELETE = mock.MagicMock(return_value=None)
|
MOCK_DELETE = mock.MagicMock(return_value=None)
|
||||||
|
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ MOCK_UPDATED_WORKBOOK = mock.MagicMock(return_value=UPDATED_WORKBOOK_DB)
|
|||||||
MOCK_DELETE = mock.MagicMock(return_value=None)
|
MOCK_DELETE = mock.MagicMock(return_value=None)
|
||||||
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
||||||
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
||||||
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
|
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
|
||||||
|
|
||||||
|
|
||||||
class TestWorkbooksController(base.FunctionalTest):
|
class TestWorkbooksController(base.FunctionalTest):
|
||||||
|
@ -21,6 +21,7 @@ from mistral.db.v2 import api as db_api
|
|||||||
from mistral.db.v2.sqlalchemy import models
|
from mistral.db.v2.sqlalchemy import models
|
||||||
from mistral import exceptions as exc
|
from mistral import exceptions as exc
|
||||||
from mistral.tests.unit.api import base
|
from mistral.tests.unit.api import base
|
||||||
|
from mistral import utils
|
||||||
|
|
||||||
WF_DEFINITION = """
|
WF_DEFINITION = """
|
||||||
---
|
---
|
||||||
@ -37,6 +38,7 @@ flow:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
WF_DB = models.WorkflowDefinition(
|
WF_DB = models.WorkflowDefinition(
|
||||||
|
id='123e4567-e89b-12d3-a456-426655440000',
|
||||||
name='flow',
|
name='flow',
|
||||||
definition=WF_DEFINITION,
|
definition=WF_DEFINITION,
|
||||||
created_at=datetime.datetime(1970, 1, 1),
|
created_at=datetime.datetime(1970, 1, 1),
|
||||||
@ -45,6 +47,7 @@ WF_DB = models.WorkflowDefinition(
|
|||||||
)
|
)
|
||||||
|
|
||||||
WF = {
|
WF = {
|
||||||
|
'id': '123e4567-e89b-12d3-a456-426655440000',
|
||||||
'name': 'flow',
|
'name': 'flow',
|
||||||
'definition': WF_DEFINITION,
|
'definition': WF_DEFINITION,
|
||||||
'created_at': '1970-01-01 00:00:00',
|
'created_at': '1970-01-01 00:00:00',
|
||||||
@ -140,7 +143,7 @@ MOCK_UPDATED_WF = mock.MagicMock(return_value=UPDATED_WF_DB)
|
|||||||
MOCK_DELETE = mock.MagicMock(return_value=None)
|
MOCK_DELETE = mock.MagicMock(return_value=None)
|
||||||
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
||||||
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
||||||
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
|
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
|
||||||
|
|
||||||
|
|
||||||
class TestWorkflowsController(base.FunctionalTest):
|
class TestWorkflowsController(base.FunctionalTest):
|
||||||
@ -293,6 +296,75 @@ class TestWorkflowsController(base.FunctionalTest):
|
|||||||
|
|
||||||
self.assertEqual(len(resp.json['workflows']), 0)
|
self.assertEqual(len(resp.json['workflows']), 0)
|
||||||
|
|
||||||
|
@mock.patch.object(db_api, "get_workflow_definitions", MOCK_WFS)
|
||||||
|
def test_get_all_pagination(self):
|
||||||
|
resp = self.app.get(
|
||||||
|
'/v2/workflows?limit=1&sort_keys=id,name')
|
||||||
|
|
||||||
|
self.assertEqual(resp.status_int, 200)
|
||||||
|
|
||||||
|
self.assertIn('next', resp.json)
|
||||||
|
|
||||||
|
self.assertEqual(len(resp.json['workflows']), 1)
|
||||||
|
self.assertDictEqual(WF, resp.json['workflows'][0])
|
||||||
|
|
||||||
|
param_dict = utils.get_dict_from_string(
|
||||||
|
resp.json['next'].split('?')[1],
|
||||||
|
delimiter='&'
|
||||||
|
)
|
||||||
|
|
||||||
|
expected_dict = {
|
||||||
|
'marker': '123e4567-e89b-12d3-a456-426655440000',
|
||||||
|
'limit': 1,
|
||||||
|
'sort_keys': 'id,name',
|
||||||
|
'sort_dirs': 'asc,asc'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertDictEqual(expected_dict, param_dict)
|
||||||
|
|
||||||
|
def test_get_all_pagination_limit_negative(self):
|
||||||
|
resp = self.app.get(
|
||||||
|
'/v2/workflows?limit=-1&sort_keys=id,name&sort_dirs=asc,asc',
|
||||||
|
expect_errors=True
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(resp.status_int, 400)
|
||||||
|
|
||||||
|
self.assertIn("Limit must be positive", resp.body)
|
||||||
|
|
||||||
|
def test_get_all_pagination_limit_not_integer(self):
|
||||||
|
resp = self.app.get(
|
||||||
|
'/v2/workflows?limit=1.1&sort_keys=id,name&sort_dirs=asc,asc',
|
||||||
|
expect_errors=True
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(resp.status_int, 400)
|
||||||
|
|
||||||
|
self.assertIn("unable to convert to int", resp.body)
|
||||||
|
|
||||||
|
def test_get_all_pagination_invalid_sort_dirs_length(self):
|
||||||
|
resp = self.app.get(
|
||||||
|
'/v2/workflows?limit=1&sort_keys=id,name&sort_dirs=asc,asc,asc',
|
||||||
|
expect_errors=True
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(resp.status_int, 400)
|
||||||
|
|
||||||
|
self.assertIn(
|
||||||
|
"Length of sort_keys must be equal or greater than sort_dirs",
|
||||||
|
resp.body
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_get_all_pagination_unknown_direction(self):
|
||||||
|
resp = self.app.get(
|
||||||
|
'/v2/workflows?limit=1&sort_keys=id&sort_dirs=nonexist',
|
||||||
|
expect_errors=True
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(resp.status_int, 400)
|
||||||
|
|
||||||
|
self.assertIn("Unknown sort direction", resp.body)
|
||||||
|
|
||||||
def test_validate(self):
|
def test_validate(self):
|
||||||
resp = self.app.post(
|
resp = self.app.post(
|
||||||
'/v2/workflows/validate',
|
'/v2/workflows/validate',
|
||||||
|
@ -81,7 +81,7 @@ class WorkbookTest(SQLAlchemyTest):
|
|||||||
db_api.create_workbook(WORKBOOKS[0])
|
db_api.create_workbook(WORKBOOKS[0])
|
||||||
|
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exc.DBDuplicateEntry,
|
exc.DBDuplicateEntryException,
|
||||||
db_api.create_workbook,
|
db_api.create_workbook,
|
||||||
WORKBOOKS[0]
|
WORKBOOKS[0]
|
||||||
)
|
)
|
||||||
@ -262,7 +262,7 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
|
|||||||
db_api.create_workflow_definition(WF_DEFINITIONS[0])
|
db_api.create_workflow_definition(WF_DEFINITIONS[0])
|
||||||
|
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exc.DBDuplicateEntry,
|
exc.DBDuplicateEntryException,
|
||||||
db_api.create_workflow_definition,
|
db_api.create_workflow_definition,
|
||||||
WF_DEFINITIONS[0]
|
WF_DEFINITIONS[0]
|
||||||
)
|
)
|
||||||
@ -438,7 +438,7 @@ class ActionDefinitionTest(SQLAlchemyTest):
|
|||||||
db_api.create_action_definition(ACTION_DEFINITIONS[0])
|
db_api.create_action_definition(ACTION_DEFINITIONS[0])
|
||||||
|
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exc.DBDuplicateEntry,
|
exc.DBDuplicateEntryException,
|
||||||
db_api.create_action_definition,
|
db_api.create_action_definition,
|
||||||
ACTION_DEFINITIONS[0]
|
ACTION_DEFINITIONS[0]
|
||||||
)
|
)
|
||||||
@ -1091,7 +1091,7 @@ class CronTriggerTest(SQLAlchemyTest):
|
|||||||
db_api.create_cron_trigger(CRON_TRIGGERS[0])
|
db_api.create_cron_trigger(CRON_TRIGGERS[0])
|
||||||
|
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exc.DBDuplicateEntry,
|
exc.DBDuplicateEntryException,
|
||||||
db_api.create_cron_trigger,
|
db_api.create_cron_trigger,
|
||||||
CRON_TRIGGERS[0]
|
CRON_TRIGGERS[0]
|
||||||
)
|
)
|
||||||
@ -1217,7 +1217,7 @@ class EnvironmentTest(SQLAlchemyTest):
|
|||||||
db_api.create_environment(ENVIRONMENTS[0])
|
db_api.create_environment(ENVIRONMENTS[0])
|
||||||
|
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exc.DBDuplicateEntry,
|
exc.DBDuplicateEntryException,
|
||||||
db_api.create_environment,
|
db_api.create_environment,
|
||||||
ENVIRONMENTS[0]
|
ENVIRONMENTS[0]
|
||||||
)
|
)
|
||||||
@ -1400,7 +1400,7 @@ class TXTest(SQLAlchemyTest):
|
|||||||
|
|
||||||
db_api.create_workbook(WORKBOOKS[0])
|
db_api.create_workbook(WORKBOOKS[0])
|
||||||
|
|
||||||
except exc.DBDuplicateEntry:
|
except exc.DBDuplicateEntryException:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
self.assertFalse(self.is_db_session_open())
|
self.assertFalse(self.is_db_session_open())
|
||||||
|
@ -109,7 +109,7 @@ class TriggerServiceV2Test(base.DbTestCase):
|
|||||||
# But creation with the same count and first time
|
# But creation with the same count and first time
|
||||||
# simultaneously leads to error.
|
# simultaneously leads to error.
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exc.DBDuplicateEntry,
|
exc.DBDuplicateEntryException,
|
||||||
t_s.create_cron_trigger,
|
t_s.create_cron_trigger,
|
||||||
'test4',
|
'test4',
|
||||||
self.wf.name,
|
self.wf.name,
|
||||||
|
@ -36,7 +36,7 @@ class ExceptionTestCase(base.BaseTest):
|
|||||||
self.assertEqual(exc.http_code, 404)
|
self.assertEqual(exc.http_code, 404)
|
||||||
|
|
||||||
def test_duplicate_obj_code(self):
|
def test_duplicate_obj_code(self):
|
||||||
exc = exceptions.DBDuplicateEntry()
|
exc = exceptions.DBDuplicateEntryException()
|
||||||
self.assertIn("Database object already exists",
|
self.assertIn("Database object already exists",
|
||||||
six.text_type(exc))
|
six.text_type(exc))
|
||||||
self.assertEqual(exc.http_code, 409)
|
self.assertEqual(exc.http_code, 409)
|
||||||
|
@ -109,7 +109,7 @@ class UtilsTest(base.BaseTest):
|
|||||||
|
|
||||||
def test_get_input_dict_from_input_string(self):
|
def test_get_input_dict_from_input_string(self):
|
||||||
input_string = 'param1, param2=2, param3="var3"'
|
input_string = 'param1, param2=2, param3="var3"'
|
||||||
input_dict = utils.get_input_dict_from_input_string(input_string)
|
input_dict = utils.get_dict_from_string(input_string)
|
||||||
|
|
||||||
self.assertIn('param1', input_dict)
|
self.assertIn('param1', input_dict)
|
||||||
self.assertIn('param2', input_dict)
|
self.assertIn('param2', input_dict)
|
||||||
|
@ -205,11 +205,11 @@ class NotDefined(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_input_dict_from_input_string(input_string):
|
def get_dict_from_string(input_string, delimiter=','):
|
||||||
if not input_string:
|
if not input_string:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
raw_inputs = input_string.split(',')
|
raw_inputs = input_string.split(delimiter)
|
||||||
|
|
||||||
inputs = []
|
inputs = []
|
||||||
|
|
||||||
@ -218,7 +218,13 @@ def get_input_dict_from_input_string(input_string):
|
|||||||
name_value = input.split('=')
|
name_value = input.split('=')
|
||||||
|
|
||||||
if len(name_value) > 1:
|
if len(name_value) > 1:
|
||||||
inputs += [{name_value[0]: json.loads(name_value[1])}]
|
|
||||||
|
try:
|
||||||
|
value = json.loads(name_value[1])
|
||||||
|
except ValueError:
|
||||||
|
value = name_value[1]
|
||||||
|
|
||||||
|
inputs += [{name_value[0]: value}]
|
||||||
else:
|
else:
|
||||||
inputs += [name_value[0]]
|
inputs += [name_value[0]]
|
||||||
|
|
||||||
|
@ -50,3 +50,20 @@ def wrap_pecan_controller_exception(func):
|
|||||||
pecan.response.translatable_error = excp
|
pecan.response.translatable_error = excp
|
||||||
pecan.abort(excp.http_code, six.text_type(excp))
|
pecan.abort(excp.http_code, six.text_type(excp))
|
||||||
return wrapped
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
|
def validate_query_params(limit, sort_keys, sort_dirs):
|
||||||
|
if limit is not None and limit <= 0:
|
||||||
|
raise exc.ClientSideError("Limit must be positive.")
|
||||||
|
|
||||||
|
if len(sort_keys) < len(sort_dirs):
|
||||||
|
raise exc.ClientSideError("Length of sort_keys must be equal or "
|
||||||
|
"greater than sort_dirs.")
|
||||||
|
|
||||||
|
if len(sort_keys) > len(sort_dirs):
|
||||||
|
sort_dirs.extend(['asc'] * (len(sort_keys) - len(sort_dirs)))
|
||||||
|
|
||||||
|
for sort_dir in sort_dirs:
|
||||||
|
if sort_dir not in ['asc', 'desc']:
|
||||||
|
raise exc.ClientSideError("Unknown sort direction, must be 'desc' "
|
||||||
|
"or 'asc'")
|
||||||
|
Loading…
Reference in New Issue
Block a user