add tests for _query_to_kwargs func

add test case for _query_to_kwargs function in v2 api.
raise InvalidInput on invalid operator

Change-Id: Ia9e9b3905327d21e3c6f91aa0acd36f19f4f59b0
Partial-Bug: #1217129
This commit is contained in:
Gordon Chung
2013-08-26 18:40:36 -04:00
parent f505ae11b7
commit 05db82ac1b
2 changed files with 166 additions and 126 deletions

View File

@@ -250,8 +250,8 @@ def _query_to_kwargs(query, db_func, internal_keys=[], headers=None):
'project_id': 'project', 'project_id': 'project',
'resource_id': 'resource'} 'resource_id': 'resource'}
stamp = {} stamp = {}
trans = {}
metaquery = {} metaquery = {}
kwargs = {}
for i in query: for i in query:
if i.field == 'timestamp': if i.field == 'timestamp':
if i.op in ('lt', 'le'): if i.op in ('lt', 'le'):
@@ -261,22 +261,29 @@ def _query_to_kwargs(query, db_func, internal_keys=[], headers=None):
stamp['start_timestamp'] = i.value stamp['start_timestamp'] = i.value
stamp['start_timestamp_op'] = i.op stamp['start_timestamp_op'] = i.op
else: else:
LOG.warn('_query_to_kwargs ignoring %r unexpected op %r"' % raise wsme.exc.InvalidInput('op', i.op,
(i.field, i.op)) 'unimplemented operator for %s' %
i.field)
else: else:
if i.op != 'eq': if i.op == 'eq':
LOG.warn('_query_to_kwargs ignoring %r unimplemented op %r' % if i.field == 'search_offset':
(i.field, i.op))
elif i.field == 'search_offset':
stamp['search_offset'] = i.value stamp['search_offset'] = i.value
elif i.field.startswith('metadata.'): elif i.field.startswith('metadata.'):
metaquery[i.field] = i._get_value_as_type() metaquery[i.field] = i._get_value_as_type()
elif i.field.startswith('resource_metadata.'): elif i.field.startswith('resource_metadata.'):
metaquery[i.field[9:]] = i._get_value_as_type() metaquery[i.field[9:]] = i._get_value_as_type()
else: else:
trans[translation.get(i.field, i.field)] = i.value key = translation.get(i.field, i.field)
if key not in valid_keys:
msg = ("unrecognized field in query: %s, "
"valid keys: %s") % (query, valid_keys)
raise wsme.exc.UnknownArgument(key, msg)
kwargs[key] = i.value
else:
raise wsme.exc.InvalidInput('op', i.op,
'unimplemented operator for %s' %
i.field)
kwargs = {}
if metaquery and 'metaquery' in valid_keys: if metaquery and 'metaquery' in valid_keys:
kwargs['metaquery'] = metaquery kwargs['metaquery'] = metaquery
if stamp: if stamp:
@@ -295,14 +302,6 @@ def _query_to_kwargs(query, db_func, internal_keys=[], headers=None):
if 'end_timestamp_op' in stamp: if 'end_timestamp_op' in stamp:
kwargs['end_timestamp_op'] = stamp['end_timestamp_op'] kwargs['end_timestamp_op'] = stamp['end_timestamp_op']
if trans:
for k in trans:
if k not in valid_keys:
msg = ("unrecognized field in query: %s, valid keys: %s" %
(query, valid_keys))
raise wsme.exc.UnknownArgument(k, msg)
kwargs[k] = trans[k]
return kwargs return kwargs

View File

@@ -14,14 +14,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Test the methods related to query.""" """Test the methods related to query."""
import datetime import datetime
import wsme import wsme
from ceilometer import storage
from ceilometer.api.controllers import v2 as api from ceilometer.api.controllers import v2 as api
from ceilometer.api.controllers.v2 import Query from ceilometer.api.controllers.v2 import Query
from ceilometer.api.controllers.v2 import _query_to_kwargs from ceilometer.openstack.common import timeutils
from ceilometer.tests import base as tests_base from ceilometer.tests import base as tests_base
@@ -135,108 +135,6 @@ class TestQuery(tests_base.TestCase):
expected = value expected = value
self.assertEqual(query._get_value_as_type(), expected) self.assertEqual(query._get_value_as_type(), expected)
def _fake_db_func(self, resource, on_behalf_of, x, y,
metaquery={}, user=None, project=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None, **kwargs):
pass
def test_query_to_kwargs_exclude_internal(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['y', 'on_behalf_of', 'x']]
self.assertRaises(wsme.exc.ClientSideError,
_query_to_kwargs,
queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'},
internal_keys=['on_behalf_of'])
def test_query_to_kwargs_self_always_excluded(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['x', 'y']]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertFalse('self' in kwargs)
def test_query_to_kwargs_timestamp_mapping(self):
start = datetime.datetime.utcnow()
end = datetime.datetime.utcnow()
queries = [Query(field='timestamp',
op='gt',
value=start.isoformat(),
type='string'),
Query(field='timestamp',
op='le',
value=end.isoformat(),
type='string')]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertEqual(kwargs.get('start_timestamp'), start)
self.assertEqual(kwargs.get('start_timestamp_op'), 'gt')
self.assertEqual(kwargs.get('end_timestamp'), end)
self.assertEqual(kwargs.get('end_timestamp_op'), 'le')
def test_query_to_kwargs_non_equality_on_metadata(self):
queries = [Query(field='resource_metadata.image_id',
op='gt',
value='image',
type='string'),
Query(field='metadata.ramdisk_id',
op='le',
value='ramdisk',
type='string')]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertFalse('metaquery' in kwargs)
def test_query_to_kwargs_equality_on_metadata(self):
queries = [Query(field='resource_metadata.image_id',
op='eq',
value='image',
type='string'),
Query(field='metadata.ramdisk_id',
op='eq',
value='ramdisk',
type='string')]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
self.assertTrue('metaquery' in kwargs)
metaquery = kwargs['metaquery']
self.assertEqual(metaquery.get('metadata.image_id'), 'image')
self.assertEqual(metaquery.get('metadata.ramdisk_id'), 'ramdisk')
def test_query_to_kwargs_translation(self):
queries = [Query(field=f,
op='eq',
value='fake_%s' % f,
type='string') for f in ['user_id',
'project_id',
'resource_id']]
kwargs = _query_to_kwargs(queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
for o in ['user', 'project', 'resource']:
self.assertEqual(kwargs.get(o), 'fake_%s_id' % o)
def test_query_to_kwargs_unrecognized(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['y', 'z', 'x']]
self.assertRaises(wsme.exc.ClientSideError,
_query_to_kwargs,
queries,
self._fake_db_func,
headers={'X-ProjectId': 'foobar'})
class TestValidateGroupByFields(tests_base.TestCase): class TestValidateGroupByFields(tests_base.TestCase):
@@ -265,3 +163,146 @@ class TestValidateGroupByFields(tests_base.TestCase):
api._validate_groupby_fields(['user_id', 'source', 'user_id']) api._validate_groupby_fields(['user_id', 'source', 'user_id'])
) )
self.assertEqual(result, set(['user_id', 'source'])) self.assertEqual(result, set(['user_id', 'source']))
class TestQueryToKwArgs(tests_base.TestCase):
def setUp(self):
super(TestQueryToKwArgs, self).setUp()
self.stubs.Set(api, '_sanitize_query', lambda x, y, **z: x)
def test_sample_filter_single(self):
q = [Query(field='user_id',
op='eq',
value='uid')]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertIn('user', kwargs)
self.assertEqual(len(kwargs), 1)
self.assertEqual(kwargs['user'], 'uid')
def test_sample_filter_multi(self):
q = [Query(field='user_id',
op='eq',
value='uid'),
Query(field='project_id',
op='eq',
value='pid'),
Query(field='resource_id',
op='eq',
value='rid'),
Query(field='source',
op='eq',
value='source_name'),
Query(field='meter',
op='eq',
value='meter_name')]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertEqual(len(kwargs), 5)
self.assertEqual(kwargs['user'], 'uid')
self.assertEqual(kwargs['project'], 'pid')
self.assertEqual(kwargs['resource'], 'rid')
self.assertEqual(kwargs['source'], 'source_name')
self.assertEqual(kwargs['meter'], 'meter_name')
def test_sample_filter_timestamp(self):
ts_start = timeutils.utcnow()
ts_end = ts_start + datetime.timedelta(minutes=5)
q = [Query(field='timestamp',
op='lt',
value=str(ts_end)),
Query(field='timestamp',
op='gt',
value=str(ts_start))]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertEqual(len(kwargs), 4)
self.assertEqual(kwargs['start'], ts_start)
self.assertEqual(kwargs['end'], ts_end)
self.assertEqual(kwargs['start_timestamp_op'], 'gt')
self.assertEqual(kwargs['end_timestamp_op'], 'lt')
def test_sample_filter_meta(self):
q = [Query(field='metadata.size',
op='eq',
value='20'),
Query(field='resource_metadata.id',
op='eq',
value='meta_id')]
kwargs = api._query_to_kwargs(q, storage.SampleFilter.__init__)
self.assertEqual(len(kwargs), 1)
self.assertEqual(len(kwargs['metaquery']), 2)
self.assertEqual(kwargs['metaquery']['metadata.size'], 20)
self.assertEqual(kwargs['metaquery']['metadata.id'], 'meta_id')
def test_sample_filter_non_equality_on_metadata(self):
queries = [Query(field='resource_metadata.image_id',
op='gt',
value='image',
type='string'),
Query(field='metadata.ramdisk_id',
op='le',
value='ramdisk',
type='string')]
self.assertRaises(
wsme.exc.InvalidInput,
api._query_to_kwargs,
queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'})
def test_sample_filter_invalid_field(self):
q = [Query(field='invalid',
op='eq',
value='20')]
self.assertRaises(
wsme.exc.UnknownArgument,
api._query_to_kwargs, q, storage.SampleFilter.__init__)
def test_sample_filter_invalid_op(self):
q = [Query(field='user_id',
op='lt',
value='20')]
self.assertRaises(
wsme.exc.InvalidInput,
api._query_to_kwargs, q, storage.SampleFilter.__init__)
def test_sample_filter_timestamp_invalid_op(self):
ts_start = timeutils.utcnow()
q = [Query(field='timestamp',
op='eq',
value=str(ts_start))]
self.assertRaises(
wsme.exc.InvalidInput,
api._query_to_kwargs, q, storage.SampleFilter.__init__)
def test_sample_filter_exclude_internal(self):
queries = [Query(field=f,
op='eq',
value='fake',
type='string') for f in ['y', 'on_behalf_of', 'x']]
self.assertRaises(wsme.exc.ClientSideError,
api._query_to_kwargs,
queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'},
internal_keys=['on_behalf_of'])
def test_sample_filter_self_always_excluded(self):
queries = [Query(field='user_id',
op='eq',
value='20')]
kwargs = api._query_to_kwargs(queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'})
self.assertFalse('self' in kwargs)
def test_sample_filter_translation(self):
queries = [Query(field=f,
op='eq',
value='fake_%s' % f,
type='string') for f in ['user_id',
'project_id',
'resource_id']]
kwargs = api._query_to_kwargs(queries,
storage.SampleFilter.__init__,
headers={'X-ProjectId': 'foobar'})
for o in ['user', 'project', 'resource']:
self.assertEqual(kwargs.get(o), 'fake_%s_id' % o)