Merge "Add fields filter for workflow query"

This commit is contained in:
Jenkins 2015-09-02 08:41:26 +00:00 committed by Gerrit Code Review
commit 11bbd8501e
9 changed files with 149 additions and 37 deletions

View File

@ -61,6 +61,12 @@ class Resource(wtypes.Base):
def to_string(self):
return json.dumps(self.to_dict())
@classmethod
def get_fields(cls):
obj = cls()
return [attr.name for attr in obj._wsme_attributes]
class ResourceList(Resource):
"""Resource containing the list of other resources."""
@ -73,7 +79,8 @@ class ResourceList(Resource):
return getattr(self, self._type)
@classmethod
def convert_with_links(cls, resources, limit, url=None, **kwargs):
def convert_with_links(cls, resources, limit, url=None, fields=None,
**kwargs):
resource_collection = cls()
setattr(resource_collection, resource_collection._type, resources)
@ -81,6 +88,7 @@ class ResourceList(Resource):
resource_collection.next = resource_collection.get_next(
limit,
url=url,
fields=fields,
**kwargs
)
@ -90,7 +98,7 @@ class ResourceList(Resource):
"""Return whether resources has more items."""
return len(self.collection) and len(self.collection) == limit
def get_next(self, limit, url=None, **kwargs):
def get_next(self, limit, url=None, fields=None, **kwargs):
"""Return a link to the next subset of the resources."""
if not self.has_next(limit):
return wtypes.Unset
@ -99,11 +107,19 @@ class ResourceList(Resource):
['%s=%s&' % (key, value) for key, value in kwargs.items()]
)
resource_args = '?%(args)slimit=%(limit)d&marker=%(marker)s' % {
'args': q_args,
'limit': limit,
'marker': self.collection[-1].id
}
resource_args = (
'?%(args)slimit=%(limit)d&marker=%(marker)s' %
{
'args': q_args,
'limit': limit,
'marker': self.collection[-1].id
}
)
# Fields is handled specially here, we can move it above when it's
# supported by all resources query.
if fields:
resource_args += '&fields=%s' % fields
next_link = "%(host_url)s/v2/%(resource)s%(args)s" % {
'host_url': url,

View File

@ -72,15 +72,16 @@ class Workflow(resource.Resource):
if hasattr(e, key):
setattr(e, key, val)
input = d['spec'].get('input', [])
for param in input:
if isinstance(param, dict):
for k, v in param.items():
input_list.append("%s=%s" % (k, v))
else:
input_list.append(param)
if 'spec' in d:
input = d.get('spec', {}).get('input', [])
for param in input:
if isinstance(param, dict):
for k, v in param.items():
input_list.append("%s=%s" % (k, v))
else:
input_list.append(param)
setattr(e, 'input', ", ".join(input_list) if input_list else None)
setattr(e, 'input', ", ".join(input_list) if input_list else '')
return e
@ -175,9 +176,9 @@ class WorkflowsController(rest.RestController, hooks.HookController):
@rest_utils.wrap_pecan_controller_exception
@wsme_pecan.wsexpose(Workflows, types.uuid, int, types.uniquelist,
types.list)
types.list, types.uniquelist)
def get_all(self, marker=None, limit=None, sort_keys='created_at',
sort_dirs='asc'):
sort_dirs='asc', fields=''):
"""Return a list of workflows.
:param marker: Optional. Pagination marker for large data sets.
@ -189,14 +190,23 @@ class WorkflowsController(rest.RestController, hooks.HookController):
:param sort_dirs: Optional. Directions to sort corresponding to
sort_keys, "asc" or "desc" can be choosed.
Default: asc.
:param fields: Optional. A specified list of fields of the resource to
be returned. 'id' will be included automatically in
fields if it's provided, since it will be used when
constructing 'next' link.
Where project_id is the same as the requester or
project_id is different but the scope is public.
"""
LOG.info("Fetch workflows. marker=%s, limit=%s, sort_keys=%s, "
"sort_dirs=%s", marker, limit, sort_keys, sort_dirs)
"sort_dirs=%s, fields=%s", marker, limit, sort_keys,
sort_dirs, fields)
if fields and 'id' not in fields:
fields.insert(0, 'id')
rest_utils.validate_query_params(limit, sort_keys, sort_dirs)
rest_utils.validate_fields(fields, Workflow.get_fields())
marker_obj = None
@ -207,16 +217,22 @@ class WorkflowsController(rest.RestController, hooks.HookController):
limit=limit,
marker=marker_obj,
sort_keys=sort_keys,
sort_dirs=sort_dirs
sort_dirs=sort_dirs,
fields=fields
)
workflows_list = [Workflow.from_dict(db_model.to_dict())
for db_model in db_workflows]
workflows_list = []
for data in db_workflows:
workflow_dict = (dict(zip(fields, data)) if fields else
data.to_dict())
workflows_list.append(Workflow.from_dict(workflow_dict))
return Workflows.convert_with_links(
workflows_list,
limit,
pecan.request.host_url,
sort_keys=','.join(sort_keys),
sort_dirs=','.join(sort_dirs)
sort_dirs=','.join(sort_dirs),
fields=','.join(fields) if fields else ''
)

View File

@ -181,9 +181,14 @@ def get_driver_name(session=None):
@session_aware()
def model_query(model, session=None):
def model_query(model, columns=(), session=None):
"""Query helper.
:param model: base model to query
:param model: Base model to query.
:param columns: Optional. Which columns to be queried.
"""
if columns:
return session.query(*columns)
return session.query(model)

View File

@ -116,15 +116,14 @@ def load_workflow_definition(name):
return IMPL.load_workflow_definition(name)
# NOTE(xylan): We just leave filter param here for future usage
def get_workflow_definitions(filters=None, limit=None, marker=None,
sort_keys=None, sort_dirs=None, **kwargs):
def get_workflow_definitions(limit=None, marker=None, sort_keys=None,
sort_dirs=None, fields=None, **kwargs):
return IMPL.get_workflow_definitions(
filters=filters,
limit=limit,
marker=marker,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
fields=fields,
**kwargs
)

View File

@ -102,8 +102,8 @@ def acquire_lock(model, id, session=None):
sqlite_lock.acquire_lock(id, session)
def _secure_query(model):
query = b.model_query(model)
def _secure_query(model, *columns):
query = b.model_query(model, columns)
if issubclass(model, mb.MistralSecureModelBase):
query = query.filter(
@ -235,6 +235,18 @@ def delete_workbooks(**kwargs):
# Workflow definitions.
WORKFLOW_COL_MAPPING = {
'id': models.WorkflowDefinition.id,
'name': models.WorkflowDefinition.name,
'input': models.WorkflowDefinition.spec,
'definition': models.WorkflowDefinition.definition,
'tags': models.WorkflowDefinition.tags,
'scope': models.WorkflowDefinition.scope,
'created_at': models.WorkflowDefinition.created_at,
'updated_at': models.WorkflowDefinition.updated_at
}
def get_workflow_definition(name):
wf_def = _get_workflow_definition(name)
@ -261,10 +273,13 @@ def load_workflow_definition(name):
return _get_workflow_definition(name)
# NOTE(xylan): We just leave filter param here for future usage
def get_workflow_definitions(filters=None, limit=None, marker=None,
sort_keys=None, sort_dirs=None, **kwargs):
query = _secure_query(models.WorkflowDefinition)
def get_workflow_definitions(limit=None, marker=None, sort_keys=None,
sort_dirs=None, fields=None, **kwargs):
columns = (
tuple(WORKFLOW_COL_MAPPING.get(f) for f in fields) if fields else ()
)
query = _secure_query(models.WorkflowDefinition, *columns)
try:
return _paginate_query(

View File

@ -148,6 +148,15 @@ class WorkflowTestsV2(base.TestCase):
self.assertNotIn('next', body)
@test.attr(type='smoke')
def test_get_list_workflows_with_fields(self):
resp, body = self.client.get_list_obj('workflows?fields=name')
self.assertEqual(200, resp.status)
for wf in body['workflows']:
self.assertListEqual(['id', 'name'], wf.keys())
@test.attr(type='smoke')
def test_get_list_workflows_with_pagination(self):
resp, body = self.client.get_list_obj(

View File

@ -244,7 +244,9 @@ class TestActionsController(base.FunctionalTest):
'sort_dirs': 'asc,asc'
}
self.assertDictEqual(expected_dict, param_dict)
self.assertTrue(
set(expected_dict.items()).issubset(set(param_dict.items()))
)
def test_get_all_pagination_limit_negative(self):
resp = self.app.get(

View File

@ -317,7 +317,7 @@ class TestWorkflowsController(base.FunctionalTest):
'marker': '123e4567-e89b-12d3-a456-426655440000',
'limit': 1,
'sort_keys': 'id,name',
'sort_dirs': 'asc,asc'
'sort_dirs': 'asc,asc',
}
self.assertDictEqual(expected_dict, param_dict)
@ -365,6 +365,37 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertIn("Unknown sort direction", resp.body)
@mock.patch('mistral.db.v2.api.get_workflow_definitions')
def test_get_all_with_fields_filter(self, mock_get_db_wfs):
mock_get_db_wfs.return_value = [
('123e4567-e89b-12d3-a456-426655440000', 'fake_name')
]
resp = self.app.get('/v2/workflows?fields=name')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['workflows']), 1)
expected_dict = {
'id': '123e4567-e89b-12d3-a456-426655440000',
'name': 'fake_name'
}
self.assertDictEqual(expected_dict, resp.json['workflows'][0])
def test_get_all_with_invalid_field(self):
resp = self.app.get(
'/v2/workflows?fields=name,nonexist',
expect_errors=True
)
self.assertEqual(resp.status_int, 400)
self.assertIn(
"nonexist are invalid",
resp.body
)
def test_validate(self):
resp = self.app.post(
'/v2/workflows/validate',

View File

@ -66,4 +66,23 @@ def validate_query_params(limit, sort_keys, sort_dirs):
for sort_dir in sort_dirs:
if sort_dir not in ['asc', 'desc']:
raise exc.ClientSideError("Unknown sort direction, must be 'desc' "
"or 'asc'")
"or 'asc'.")
def validate_fields(fields, object_fields):
"""Check for requested non-existent fields.
Check if the user requested non-existent fields.
:param fields: A list of fields requested by the user.
:param object_fields: A list of fields supported by the object.
"""
if not fields:
return
invalid_fields = set(fields) - set(object_fields)
if invalid_fields:
raise exc.ClientSideError(
'Field(s) %s are invalid.' % ', '.join(invalid_fields)
)