Implement complex query API

Complex query computes the single complex query to multiple simple
queries which only use AND operator, each simple query will invoke
get_samples() method. Complex query collects all the results till
limit constraint is satisfied.

Change-Id: I0da398d4d3627fcbfe2c686acf5b0fa9a68492b7
This commit is contained in:
ZhiQiang Fan 2015-12-09 10:26:36 -07:00
parent 34b63f8cbe
commit 143a308b8c
3 changed files with 266 additions and 11 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
*.egg*
*.pyc
.tox
.testrepository
simulator

View File

@ -17,6 +17,7 @@
"""
import datetime
import operator
from monascaclient import exc as monasca_exc
from oslo_config import cfg
@ -28,6 +29,7 @@ import ceilometer
from ceilometer.i18n import _
from ceilometer import monasca_client
from ceilometer.publisher.monasca_data_filter import MonascaDataFilter
from ceilometer import storage
from ceilometer.storage import base
from ceilometer.storage import models as api_models
from ceilometer import utils
@ -53,7 +55,7 @@ AVAILABLE_CAPABILITIES = {
'groupby': False,
'query': {'simple': True,
'metadata': True,
'complex': False}},
'complex': True}},
'statistics': {'groupby': False,
'query': {'simple': True,
'metadata': False},
@ -358,7 +360,11 @@ class Connection(base.Connection):
user_id=sample_filter.user,
project_id=sample_filter.project,
resource_id=sample_filter.resource,
source=sample_filter.source
source=sample_filter.source,
# Dynamic sample filter attributes, these fields are useful for
# filtering result.
unit=getattr(sample_filter, 'unit', None),
type=getattr(sample_filter, 'type', None),
)
_dimensions = {k: v for k, v in _dimensions.items() if v is not None}
@ -664,3 +670,174 @@ class Connection(base.Connection):
groupby={u'': u''},
**stats_dict
)
def _parse_to_filter_list(self, filter_expr):
"""Parse complex query expression to simple filter list.
For i.e. parse:
{"or":[{"=":{"meter":"cpu"}},{"=":{"meter":"memory"}}]}
to
[[{"=":{"counter_name":"cpu"}}],
[{"=":{"counter_name":"memory"}}]]
"""
op, nodes = filter_expr.items()[0]
msg = "%s operand is not supported" % op
if op == 'or':
filter_list = []
for node in nodes:
filter_list.extend(self._parse_to_filter_list(node))
return filter_list
elif op == 'and':
filter_list_subtree = []
for node in nodes:
filter_list_subtree.append(self._parse_to_filter_list(node))
filter_list = [[]]
for filters in filter_list_subtree:
tmp = []
for filter in filters:
for f in filter_list:
tmp.append(f + filter)
filter_list = tmp
return filter_list
elif op == 'not':
raise ceilometer.NotImplementedError(msg)
elif op in ("<", "<=", "=", ">=", ">", '!='):
return [[filter_expr]]
else:
raise ceilometer.NotImplementedError(msg)
def _parse_to_sample_filter(self, simple_filters):
"""Parse to simple filters to sample filter.
For i.e.: parse
[{"=":{"counter_name":"cpu"}},{"=":{"counter_volume": 1}}]
to
SampleFilter(counter_name="cpu", counter_volume=1)
"""
equal_only_fields = (
'counter_name',
'counter_unit',
'counter_type',
'project_id',
'user_id',
'source',
'resource_id',
# These fields are supported by Ceilometer but cannot supported
# by Monasca.
# 'message_id',
# 'message_signature',
# 'recorded_at',
)
field_map = {
"project_id": "project",
"user_id": "user",
"resource_id": "resource",
"counter_name": "meter",
"counter_type": "type",
"counter_unit": "unit",
}
msg = "operand %s cannot be applied to field %s"
kwargs = {'metaquery': {}}
for sf in simple_filters:
op = sf.keys()[0]
field, value = sf.values()[0].items()[0]
if field in equal_only_fields:
if op != '=':
raise ceilometer.NotImplementedError(msg % (op, field))
field = field_map.get(field, field)
kwargs[field] = value
elif field == 'timestamp':
if op == '>=':
kwargs['start_timestamp'] = value
kwargs['start_timestamp_op'] = 'ge'
elif op == '<=':
kwargs['end_timestamp'] = value
kwargs['end_timestamp_op'] = 'le'
else:
raise ceilometer.NotImplementedError(msg % (op, field))
elif field == 'counter_volume':
kwargs['volume'] = value
kwargs['volume_op'] = op
elif (field.startswith('resource_metadata.') or
field.startswith('metadata.')):
kwargs['metaquery'][field] = value
else:
ra_msg = "field %s is not supported" % field
raise ceilometer.NotImplementedError(ra_msg)
sample_type = kwargs.pop('type', None)
sample_unit = kwargs.pop('unit', None)
sample_volume = kwargs.pop('volume', None)
sample_volume_op = kwargs.pop('volume_op', None)
sample_filter = storage.SampleFilter(**kwargs)
# Add some dynamic attributes, type and unit attributes can be used
# when query Monasca API, volume and volime_op attributes can
# be used for volume comparison.
sample_filter.type = sample_type
sample_filter.unit = sample_unit
sample_filter.volume = sample_volume
sample_filter.volume_op = sample_volume_op
return sample_filter
def _parse_to_sample_filters(self, filter_expr):
"""Parse complex query expression to sample filter list."""
filter_list = self._parse_to_filter_list(filter_expr)
sample_filters = []
for filters in filter_list:
sf = self._parse_to_sample_filter(filters)
if sf:
sample_filters.append(sf)
return sample_filters
def _validate_samples_by_volume(self, samples, sf):
if not sf.volume:
return samples
op_func_map = {
'<': operator.lt,
'<=': operator.le,
'=': operator.eq,
'>=': operator.ge,
'>': operator.gt,
'!=': operator.ne,
}
ret = []
for s in samples:
op_func = op_func_map[sf.volume_op]
volume = getattr(s, 'volume', getattr(s, 'counter_volume', None))
if op_func(volume, sf.volume):
ret.append(s)
return ret
def query_samples(self, filter_expr=None, orderby=None, limit=None):
if not filter_expr:
msg = "fitler must be specified"
raise ceilometer.NotImplementedError(msg)
if orderby:
msg = "orderby is not supported"
raise ceilometer.NotImplementedError(msg)
if not limit:
msg = "limit must be specified"
raise ceilometer.NotImplementedError(msg)
LOG.debug("filter_expr = %s", filter_expr)
sample_filters = self._parse_to_sample_filters(filter_expr)
LOG.debug("sample_filters = %s", sample_filters)
ret = []
for sf in sample_filters:
if not sf.volume:
samples = list(self.get_samples(sf, limit))
else:
samples = self.get_samples(sf)
samples = list(self._validate_samples_by_volume(samples, sf))
if limit <= len(samples):
ret.extend(samples[0:limit])
break
else:
ret.extend(samples)
limit -= len(samples)
return ret

View File

@ -23,8 +23,9 @@ from oslotest import base
import ceilometer
from ceilometer.api.controllers.v2.meters import Aggregate
import ceilometer.storage as storage
from ceilometer import storage
from ceilometer.storage import impl_monasca
from ceilometer.storage import models as storage_models
class TestGetResources(base.BaseTestCase):
@ -460,10 +461,7 @@ class TestGetSamples(base.BaseTestCase):
self.assertEqual(1, ml_mock.call_count)
class MeterStatisticsTest(base.BaseTestCase):
Aggregate = collections.namedtuple("Aggregate", ['func', 'param'])
class _BaseTestCase(base.BaseTestCase):
def assertRaisesWithMessage(self, msg, exc_class, func, *args, **kwargs):
try:
func(*args, **kwargs)
@ -471,9 +469,14 @@ class MeterStatisticsTest(base.BaseTestCase):
exc_class.__name__)
except AssertionError:
raise
except Exception as e:
self.assertIsInstance(e, exc_class)
self.assertEqual(e.message, msg)
# Only catch specific exception so we can get stack trace when fail
except exc_class as e:
self.assertEqual(msg, e.message)
class MeterStatisticsTest(_BaseTestCase):
Aggregate = collections.namedtuple("Aggregate", ['func', 'param'])
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_not_implemented_params(self, mock_mdf):
@ -678,6 +681,79 @@ class MeterStatisticsTest(base.BaseTestCase):
stat.period_end.isoformat())
class TestQuerySamples(_BaseTestCase):
def setUp(self):
super(TestQuerySamples, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF([], project='ceilometer', validate_default_values=True)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_query_samples_not_implemented_params(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client"):
conn = impl_monasca.Connection("127.0.0.1:8080")
query = {'or': [{'=': {"project_id": "123"}},
{'=': {"user_id": "456"}}]}
self.assertRaisesWithMessage(
'fitler must be specified',
ceilometer.NotImplementedError,
lambda: list(conn.query_samples()))
self.assertRaisesWithMessage(
'limit must be specified',
ceilometer.NotImplementedError,
lambda: list(conn.query_samples(query)))
order_by = [{"timestamp": "desc"}]
self.assertRaisesWithMessage(
'orderby is not supported',
ceilometer.NotImplementedError,
lambda: list(conn.query_samples(query, order_by)))
self.assertRaisesWithMessage(
'Supply meter name at the least',
ceilometer.NotImplementedError,
lambda: list(conn.query_samples(query, None, 1)))
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_query_samples(self, mdf_mock):
SAMPLES = [[
storage_models.Sample(
counter_name="instance",
counter_type="gauge",
counter_unit="instance",
counter_volume=1,
project_id="123",
user_id="456",
resource_id="789",
resource_metadata={},
source="openstack",
recorded_at=timeutils.utcnow(),
timestamp=timeutils.utcnow(),
message_id="0",
message_signature='',)
]] * 2
samples = SAMPLES[:]
def _get_samples(*args, **kwargs):
return samples.pop()
with mock.patch("ceilometer.monasca_client.Client"):
conn = impl_monasca.Connection("127.0.0.1:8080")
with mock.patch.object(conn, 'get_samples') as gsm:
gsm.side_effect = _get_samples
query = {'or': [{'=': {"project_id": "123"}},
{'=': {"user_id": "456"}}]}
samples = conn.query_samples(query, None, 100)
self.assertEqual(2, len(samples))
self.assertEqual(2, gsm.call_count)
samples = SAMPLES[:]
query = {'and': [{'=': {"project_id": "123"}},
{'>': {"counter_volume": 2}}]}
samples = conn.query_samples(query, None, 100)
self.assertEqual(0, len(samples))
self.assertEqual(3, gsm.call_count)
class CapabilitiesTest(base.BaseTestCase):
def test_capabilities(self):
@ -704,7 +780,7 @@ class CapabilitiesTest(base.BaseTestCase):
'pagination': False,
'query':
{
'complex': False,
'complex': True,
'metadata': True,
'simple': True
}