get_resources and get_samples uses iterative approach
Implements get_resources and get_samples using iterative approach with limit, monasca measurements api accepts a limit, but cannot be used here since user can query for measurements from different metrics. Change-Id: I87d070c60af3435d20917ffc5d07adf8ed749ee2
This commit is contained in:
parent
b704e3f9c7
commit
ff9bbb979a
|
@ -17,16 +17,13 @@
|
|||
"""
|
||||
|
||||
import datetime
|
||||
|
||||
from monascaclient import exc as monasca_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
from oslo_utils import netutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
import eventlet
|
||||
from eventlet.queue import Empty
|
||||
|
||||
import ceilometer
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import monasca_client
|
||||
|
@ -41,10 +38,6 @@ OPTS = [
|
|||
default=300,
|
||||
help='Default period (in seconds) to use for querying stats '
|
||||
'in case no period specified in the stats API call.'),
|
||||
cfg.IntOpt('query_concurrency_limit',
|
||||
default=30,
|
||||
help='Number of concurrent queries to use for querying '
|
||||
'Monasca API'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OPTS, group='monasca')
|
||||
|
@ -120,8 +113,7 @@ class Connection(base.Connection):
|
|||
:param value_meta: metadata from monasca
|
||||
:returns: True for matched, False for not matched
|
||||
"""
|
||||
if (len(query) > 0 and
|
||||
(len(value_meta) == 0 or
|
||||
if (query and (len(value_meta) == 0 or
|
||||
not set(query.items()).issubset(set(value_meta.items())))):
|
||||
return False
|
||||
else:
|
||||
|
@ -183,7 +175,6 @@ class Connection(base.Connection):
|
|||
"""
|
||||
if limit == 0:
|
||||
return
|
||||
# TODO(Implement limit correctly)
|
||||
|
||||
q = {}
|
||||
if metaquery:
|
||||
|
@ -222,6 +213,7 @@ class Connection(base.Connection):
|
|||
_search_args = {k: v for k, v in _search_args.items()
|
||||
if v is not None}
|
||||
|
||||
result_count = 0
|
||||
for metric in self.mc.metrics_list(
|
||||
**dict(dimensions=dims_filter)):
|
||||
_search_args['name'] = metric['name']
|
||||
|
@ -235,6 +227,8 @@ class Connection(base.Connection):
|
|||
if not self._match_metaquery_to_value_meta(q, vm):
|
||||
continue
|
||||
if d.get('resource_id'):
|
||||
result_count += 1
|
||||
|
||||
yield api_models.Resource(
|
||||
resource_id=d.get('resource_id'),
|
||||
first_sample_timestamp=(
|
||||
|
@ -243,8 +237,12 @@ class Connection(base.Connection):
|
|||
project_id=d.get('project_id'),
|
||||
source=d.get('source'),
|
||||
user_id=d.get('user_id'),
|
||||
metadata=m['value_meta'],
|
||||
metadata=m['value_meta']
|
||||
)
|
||||
|
||||
if result_count == limit:
|
||||
return
|
||||
|
||||
except monasca_exc.HTTPConflict:
|
||||
pass
|
||||
|
||||
|
@ -296,125 +294,13 @@ class Connection(base.Connection):
|
|||
source=metric['dimensions'].get('source'),
|
||||
user_id=metric['dimensions'].get('user_id'))
|
||||
|
||||
def get_measurements(self, result_queue, metric_name, metric_dimensions,
|
||||
meta_q, start_ts, end_ts, start_op, end_op, limit):
|
||||
|
||||
start_ts = timeutils.isotime(start_ts)
|
||||
end_ts = timeutils.isotime(end_ts)
|
||||
|
||||
_search_args = dict(name=metric_name,
|
||||
start_time=start_ts,
|
||||
start_timestamp_op=start_op,
|
||||
end_time=end_ts,
|
||||
end_timestamp_op=end_op,
|
||||
merge_metrics=False,
|
||||
limit=limit,
|
||||
dimensions=metric_dimensions)
|
||||
|
||||
_search_args = {k: v for k, v in _search_args.items()
|
||||
if v is not None}
|
||||
|
||||
for sample in self.mc.measurements_list(**_search_args):
|
||||
LOG.debug(_('Retrieved sample: %s'), sample)
|
||||
|
||||
d = sample['dimensions']
|
||||
for measurement in sample['measurements']:
|
||||
meas_dict = self._convert_to_dict(measurement,
|
||||
sample['columns'])
|
||||
vm = meas_dict['value_meta']
|
||||
if not self._match_metaquery_to_value_meta(meta_q, vm):
|
||||
continue
|
||||
result_queue.put(api_models.Sample(
|
||||
source=d.get('source'),
|
||||
counter_name=sample['name'],
|
||||
counter_type=d.get('type'),
|
||||
counter_unit=d.get('unit'),
|
||||
counter_volume=meas_dict['value'],
|
||||
user_id=d.get('user_id'),
|
||||
project_id=d.get('project_id'),
|
||||
resource_id=d.get('resource_id'),
|
||||
timestamp=timeutils.parse_isotime(meas_dict['timestamp']),
|
||||
resource_metadata=meas_dict['value_meta'],
|
||||
message_id=sample['id'],
|
||||
message_signature='',
|
||||
recorded_at=(
|
||||
timeutils.parse_isotime(meas_dict['timestamp']))))
|
||||
|
||||
def get_next_time_delta(self, start, end, delta):
|
||||
# Gets next time window
|
||||
curr = start
|
||||
while curr < end:
|
||||
next = curr + delta
|
||||
yield curr, next
|
||||
curr = next
|
||||
|
||||
def get_next_task_args(self, start_timestamp=None, end_timestamp=None,
|
||||
delta=None, **kwargs):
|
||||
|
||||
# Yields next set of measurement related args
|
||||
metrics = self.mc.metrics_list(**kwargs)
|
||||
has_ts = start_timestamp and end_timestamp and delta
|
||||
if has_ts:
|
||||
for start, end in self.get_next_time_delta(
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
delta):
|
||||
for metric in metrics:
|
||||
task = {'metric': metric['name'],
|
||||
'dimension': metric['dimensions'],
|
||||
'start_ts': start,
|
||||
'end_ts': end}
|
||||
LOG.debug(_('next task is : %s'), task)
|
||||
yield task
|
||||
else:
|
||||
for metric in metrics:
|
||||
task = {'metric': metric['name'],
|
||||
'dimension': metric['dimensions']
|
||||
}
|
||||
LOG.debug(_('next task is : %s'), task)
|
||||
yield task
|
||||
|
||||
def has_more_results(self, result_queue, t_pool):
|
||||
if result_queue.empty() and t_pool.pool.running() == 0:
|
||||
return False
|
||||
return True
|
||||
|
||||
def fetch_from_queue(self, result_queue, t_pool):
|
||||
# Fetches result from queue in non-blocking way
|
||||
try:
|
||||
result = result_queue.get_nowait()
|
||||
LOG.debug(_('Retrieved result : %s'), result)
|
||||
return result
|
||||
except Empty:
|
||||
# if no data in queue, yield to work threads
|
||||
# to give them a chance
|
||||
if t_pool.pool.running() > 0:
|
||||
eventlet.sleep(0)
|
||||
|
||||
def get_results(self, result_queue, t_pool, limit=None, result_count=None):
|
||||
# Inspect and yield results
|
||||
if limit:
|
||||
while result_count < limit:
|
||||
if not self.has_more_results(result_queue, t_pool):
|
||||
break
|
||||
result = self.fetch_from_queue(result_queue, t_pool)
|
||||
if result:
|
||||
yield result
|
||||
result_count += 1
|
||||
else:
|
||||
while True:
|
||||
if not self.has_more_results(result_queue, t_pool):
|
||||
break
|
||||
result = self.fetch_from_queue(result_queue, t_pool)
|
||||
if result:
|
||||
yield result
|
||||
|
||||
def get_samples(self, sample_filter, limit=None):
|
||||
"""Return an iterable of dictionaries containing sample information.
|
||||
|
||||
{
|
||||
'source': source of the resource,
|
||||
'counter_name': name of the resource,
|
||||
'counter_name': name of the resource,if groupby:
|
||||
raise ceilometer.NotImplementedError('Groupby not implemented')
|
||||
'counter_type': type of the sample (gauge, delta, cumulative),
|
||||
'counter_unit': unit of the sample,
|
||||
'counter_volume': volume of the sample,
|
||||
|
@ -431,10 +317,9 @@ class Connection(base.Connection):
|
|||
:param sample_filter: constraints for the sample search.
|
||||
:param limit: Maximum number of results to return.
|
||||
"""
|
||||
# Initialize pool of green work threads and queue to handle results
|
||||
thread_pool = os_service.threadgroup.ThreadGroup(
|
||||
thread_pool_size=cfg.CONF.monasca.query_concurrency_limit)
|
||||
result_queue = eventlet.queue.Queue()
|
||||
|
||||
if limit == 0:
|
||||
return
|
||||
|
||||
if not sample_filter or not sample_filter.meter:
|
||||
raise ceilometer.NotImplementedError(
|
||||
|
@ -469,9 +354,6 @@ class Connection(base.Connection):
|
|||
if not sample_filter.end_timestamp:
|
||||
sample_filter.end_timestamp = datetime.datetime.utcnow()
|
||||
|
||||
delta = sample_filter.end_timestamp - sample_filter.start_timestamp
|
||||
delta = delta / cfg.CONF.monasca.query_concurrency_limit
|
||||
|
||||
_dimensions = dict(
|
||||
user_id=sample_filter.user,
|
||||
project_id=sample_filter.project,
|
||||
|
@ -484,36 +366,51 @@ class Connection(base.Connection):
|
|||
_metric_args = dict(name=sample_filter.meter,
|
||||
dimensions=_dimensions)
|
||||
|
||||
if limit:
|
||||
result_count = 0
|
||||
start_ts = timeutils.isotime(sample_filter.start_timestamp)
|
||||
end_ts = timeutils.isotime(sample_filter.end_timestamp)
|
||||
|
||||
for task_cnt, task in enumerate(self.get_next_task_args(
|
||||
sample_filter.start_timestamp, sample_filter.end_timestamp,
|
||||
delta, **_metric_args)):
|
||||
# Spawn query_concurrency_limit number of green threads
|
||||
# simultaneously to fetch measurements
|
||||
thread_pool.add_thread(self.get_measurements,
|
||||
result_queue,
|
||||
task['metric'],
|
||||
task['dimension'],
|
||||
q,
|
||||
task['start_ts'],
|
||||
task['end_ts'],
|
||||
sample_filter.start_timestamp_op,
|
||||
sample_filter.end_timestamp_op,
|
||||
limit)
|
||||
# For every query_conncurrency_limit set of tasks,
|
||||
# consume data from queue and yield before moving on to
|
||||
# next set of tasks.
|
||||
if (task_cnt + 1) % cfg.CONF.monasca.query_concurrency_limit == 0:
|
||||
for result in self.get_results(result_queue, thread_pool,
|
||||
limit,
|
||||
result_count=result_count if
|
||||
limit else None):
|
||||
yield result
|
||||
_search_args = dict(
|
||||
start_time=start_ts,
|
||||
start_timestamp_op=sample_filter.start_timestamp_op,
|
||||
end_time=end_ts,
|
||||
end_timestamp_op=sample_filter.end_timestamp_op,
|
||||
merge_metrics=False
|
||||
)
|
||||
|
||||
# Shutdown threadpool
|
||||
thread_pool.stop()
|
||||
result_count = 0
|
||||
for metric in self.mc.metrics_list(
|
||||
**_metric_args):
|
||||
_search_args['name'] = metric['name']
|
||||
_search_args['dimensions'] = metric['dimensions']
|
||||
_search_args = {k: v for k, v in _search_args.items()
|
||||
if v is not None}
|
||||
|
||||
for sample in self.mc.measurements_list(**_search_args):
|
||||
d = sample['dimensions']
|
||||
for meas in sample['measurements']:
|
||||
m = self._convert_to_dict(
|
||||
meas, sample['columns'])
|
||||
vm = m['value_meta']
|
||||
if not self._match_metaquery_to_value_meta(q, vm):
|
||||
continue
|
||||
result_count += 1
|
||||
yield api_models.Sample(
|
||||
source=d.get('source'),
|
||||
counter_name=sample['name'],
|
||||
counter_type=d.get('type'),
|
||||
counter_unit=d.get('unit'),
|
||||
counter_volume=m['value'],
|
||||
user_id=d.get('user_id'),
|
||||
project_id=d.get('project_id'),
|
||||
resource_id=d.get('resource_id'),
|
||||
timestamp=timeutils.parse_isotime(m['timestamp']),
|
||||
resource_metadata=m['value_meta'],
|
||||
message_id=sample['id'],
|
||||
message_signature='',
|
||||
recorded_at=(timeutils.parse_isotime(m['timestamp'])))
|
||||
|
||||
if result_count == limit:
|
||||
return
|
||||
|
||||
def get_meter_statistics(self, filter, period=None, groupby=None,
|
||||
aggregate=None):
|
||||
|
@ -610,12 +507,10 @@ class Connection(base.Connection):
|
|||
dimensions=dims_filter)
|
||||
group_stats_list = []
|
||||
|
||||
for task_cnt, task in enumerate(
|
||||
self.get_next_task_args(**_metric_args)):
|
||||
|
||||
for metric in self.mc.metrics_list(**_metric_args):
|
||||
_search_args = dict(
|
||||
name=task['metric'],
|
||||
dimensions=task['dimension'],
|
||||
name=metric['name'],
|
||||
dimensions=metric['dimensions'],
|
||||
start_time=filter.start_timestamp,
|
||||
end_time=filter.end_timestamp,
|
||||
period=period,
|
||||
|
|
|
@ -28,6 +28,19 @@ from ceilometer.storage import impl_monasca
|
|||
|
||||
|
||||
class TestGetResources(base.BaseTestCase):
|
||||
|
||||
dummy_get_resources_mocked_return_value = (
|
||||
[{u'dimensions': {},
|
||||
u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}]],
|
||||
u'id': u'2015-04-14T18:42:31Z',
|
||||
u'columns': [u'timestamp', u'value', u'value_meta'],
|
||||
u'name': u'image'}])
|
||||
|
||||
def setUp(self):
|
||||
super(TestGetResources, self).setUp()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.CONF([], project='ceilometer', validate_default_values=True)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_not_implemented_params(self, mock_mdf):
|
||||
with mock.patch("ceilometer.monasca_client.Client"):
|
||||
|
@ -69,9 +82,10 @@ class TestGetResources(base.BaseTestCase):
|
|||
'dimensions': {}}
|
||||
]
|
||||
kwargs = dict(source='openstack')
|
||||
list(conn.get_resources(**kwargs))
|
||||
|
||||
ml_mock = mock_client().measurements_list
|
||||
ml_mock.return_value = (
|
||||
TestGetResources.dummy_get_resources_mocked_return_value)
|
||||
list(conn.get_resources(**kwargs))
|
||||
self.assertEqual(2, ml_mock.call_count)
|
||||
self.assertEqual(dict(dimensions={},
|
||||
name='metric1',
|
||||
|
@ -79,6 +93,42 @@ class TestGetResources(base.BaseTestCase):
|
|||
start_time='1970-01-01T00:00:00Z'),
|
||||
ml_mock.call_args_list[0][1])
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_resources_limit(self, mdf_mock):
|
||||
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
|
||||
conn = impl_monasca.Connection("127.0.0.1:8080")
|
||||
|
||||
mnl_mock = mock_client().metrics_list
|
||||
mnl_mock.return_value = [{'name': 'metric1',
|
||||
'dimensions': {'resource_id': 'abcd'}},
|
||||
{'name': 'metric2',
|
||||
'dimensions': {'resource_id': 'abcd'}}
|
||||
]
|
||||
|
||||
dummy_get_resources_mocked_return_value = (
|
||||
[{u'dimensions': {u'resource_id': u'abcd'},
|
||||
u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}],
|
||||
[u'2015-04-15T17:52:31Z', 2.0, {}],
|
||||
[u'2015-04-16T17:52:31Z', 3.0, {}]],
|
||||
u'id': u'2015-04-14T18:42:31Z',
|
||||
u'columns': [u'timestamp', u'value', u'value_meta'],
|
||||
u'name': u'image'}])
|
||||
|
||||
ml_mock = mock_client().measurements_list
|
||||
ml_mock.return_value = (
|
||||
TestGetSamples.dummy_metrics_mocked_return_value
|
||||
)
|
||||
ml_mock = mock_client().measurements_list
|
||||
ml_mock.return_value = (
|
||||
dummy_get_resources_mocked_return_value)
|
||||
|
||||
sample_filter = storage.SampleFilter(
|
||||
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
|
||||
resources = list(conn.get_resources(sample_filter, limit=2))
|
||||
self.assertEqual(2, len(resources))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
self.assertEqual(2, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_resources_simple_metaquery(self, mock_mdf):
|
||||
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
|
||||
|
@ -92,8 +142,12 @@ class TestGetResources(base.BaseTestCase):
|
|||
'value_meta': {'key': 'value2'}},
|
||||
]
|
||||
kwargs = dict(metaquery={'metadata.key': 'value1'})
|
||||
list(conn.get_resources(**kwargs))
|
||||
|
||||
ml_mock = mock_client().measurements_list
|
||||
ml_mock.return_value = (
|
||||
TestGetResources.dummy_get_resources_mocked_return_value)
|
||||
list(conn.get_resources(**kwargs))
|
||||
|
||||
self.assertEqual(2, ml_mock.call_count)
|
||||
self.assertEqual(dict(dimensions={},
|
||||
name='metric2',
|
||||
|
@ -155,7 +209,6 @@ class TestGetSamples(base.BaseTestCase):
|
|||
super(TestGetSamples, self).setUp()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.CONF([], project='ceilometer', validate_default_values=True)
|
||||
self.CONF.set_override('query_concurrency_limit', 3, group='monasca')
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_not_implemented_params(self, mdf_mock):
|
||||
|
@ -177,37 +230,10 @@ class TestGetSamples(base.BaseTestCase):
|
|||
self.assertRaises(ceilometer.NotImplementedError,
|
||||
lambda: list(conn.get_samples(sample_filter)))
|
||||
|
||||
def get_concurrent_task_args(self, conn, start_time, end_time,
|
||||
sample_filter, dimensions=None, limit=None):
|
||||
|
||||
delta = ((end_time - start_time) /
|
||||
self.CONF.monasca.query_concurrency_limit)
|
||||
expected_args_list = []
|
||||
for start, end in impl_monasca.Connection.get_next_time_delta(
|
||||
conn, start_time, end_time, delta):
|
||||
if limit:
|
||||
expected_args_list.append(dict(
|
||||
dimensions=dimensions if dimensions else {},
|
||||
start_time=timeutils.isotime(start),
|
||||
start_timestamp_op=sample_filter.start_timestamp_op,
|
||||
merge_metrics=False, name='specific meter',
|
||||
limit=limit,
|
||||
end_time=timeutils.isotime(end)))
|
||||
else:
|
||||
expected_args_list.append(dict(
|
||||
dimensions=dimensions if dimensions else {},
|
||||
start_time=timeutils.isotime(start),
|
||||
start_timestamp_op=sample_filter.start_timestamp_op,
|
||||
merge_metrics=False, name='specific meter',
|
||||
end_time=timeutils.isotime(end)))
|
||||
|
||||
return expected_args_list
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_name(self, mdf_mock):
|
||||
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
|
||||
conn = impl_monasca.Connection("127.0.0.1:8080")
|
||||
|
||||
metrics_list_mock = mock_client().metrics_list
|
||||
metrics_list_mock.return_value = (
|
||||
TestGetSamples.dummy_metrics_mocked_return_value
|
||||
|
@ -215,22 +241,17 @@ class TestGetSamples(base.BaseTestCase):
|
|||
ml_mock = mock_client().measurements_list
|
||||
ml_mock.return_value = (
|
||||
TestGetSamples.dummy_get_samples_mocked_return_value)
|
||||
|
||||
start_time = datetime.datetime(1970, 1, 1)
|
||||
end_time = datetime.datetime(2015, 4, 20)
|
||||
|
||||
sample_filter = storage.SampleFilter(
|
||||
meter='specific meter',
|
||||
end_timestamp=timeutils.isotime(end_time))
|
||||
|
||||
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
|
||||
list(conn.get_samples(sample_filter))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
|
||||
expected_args_list = self.get_concurrent_task_args(
|
||||
conn, start_time, end_time, sample_filter)
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
(self.assertIn(call_arg[1], expected_args_list)
|
||||
for call_arg in ml_mock.call_args_list)
|
||||
self.assertEqual(dict(
|
||||
dimensions={},
|
||||
start_time='1970-01-01T00:00:00Z',
|
||||
merge_metrics=False, name='specific meter',
|
||||
end_time='2015-04-20T00:00:00Z'),
|
||||
ml_mock.call_args[1])
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_start_timestamp_filter(self, mdf_mock):
|
||||
|
@ -246,7 +267,6 @@ class TestGetSamples(base.BaseTestCase):
|
|||
TestGetSamples.dummy_get_samples_mocked_return_value)
|
||||
|
||||
start_time = datetime.datetime(2015, 3, 20)
|
||||
end_time = datetime.datetime.utcnow()
|
||||
|
||||
sample_filter = storage.SampleFilter(
|
||||
meter='specific meter',
|
||||
|
@ -254,14 +274,7 @@ class TestGetSamples(base.BaseTestCase):
|
|||
start_timestamp_op='ge')
|
||||
list(conn.get_samples(sample_filter))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
|
||||
expected_args_list = self.get_concurrent_task_args(conn,
|
||||
start_time,
|
||||
end_time,
|
||||
sample_filter)
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
(self.assertIn(call_arg[1], expected_args_list)
|
||||
for call_arg in ml_mock.call_args_list)
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_limit(self, mdf_mock):
|
||||
|
@ -269,30 +282,29 @@ class TestGetSamples(base.BaseTestCase):
|
|||
conn = impl_monasca.Connection("127.0.0.1:8080")
|
||||
|
||||
metrics_list_mock = mock_client().metrics_list
|
||||
|
||||
dummy_get_samples_mocked_return_value = (
|
||||
[{u'dimensions': {},
|
||||
u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}],
|
||||
[u'2015-04-15T17:52:31Z', 2.0, {}],
|
||||
[u'2015-04-16T17:52:31Z', 3.0, {}]],
|
||||
u'id': u'2015-04-14T18:42:31Z',
|
||||
u'columns': [u'timestamp', u'value', u'value_meta'],
|
||||
u'name': u'image'}])
|
||||
|
||||
metrics_list_mock.return_value = (
|
||||
TestGetSamples.dummy_metrics_mocked_return_value
|
||||
)
|
||||
ml_mock = mock_client().measurements_list
|
||||
ml_mock.return_value = (
|
||||
TestGetSamples.dummy_get_samples_mocked_return_value)
|
||||
|
||||
start_time = datetime.datetime(1970, 1, 1)
|
||||
end_time = datetime.datetime(2015, 4, 20)
|
||||
dummy_get_samples_mocked_return_value)
|
||||
|
||||
sample_filter = storage.SampleFilter(
|
||||
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
|
||||
list(conn.get_samples(sample_filter, limit=50))
|
||||
samples = list(conn.get_samples(sample_filter, limit=2))
|
||||
self.assertEqual(2, len(samples))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
|
||||
expected_args_list = self.get_concurrent_task_args(conn,
|
||||
start_time,
|
||||
end_time,
|
||||
sample_filter,
|
||||
limit=50)
|
||||
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
(self.assertIn(call_arg[1], expected_args_list)
|
||||
for call_arg in ml_mock.call_args_list)
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_project_filter(self, mock_mdf):
|
||||
|
@ -309,20 +321,11 @@ class TestGetSamples(base.BaseTestCase):
|
|||
ml_mock.return_value = (
|
||||
TestGetSamples.dummy_get_samples_mocked_return_value)
|
||||
|
||||
start_time = datetime.datetime(1970, 1, 1)
|
||||
end_time = datetime.datetime.utcnow()
|
||||
sample_filter = storage.SampleFilter(meter='specific meter',
|
||||
project='specific project')
|
||||
list(conn.get_samples(sample_filter))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
|
||||
expected_args_list = self.get_concurrent_task_args(
|
||||
conn, start_time, end_time, sample_filter,
|
||||
dimensions=dict(project_id=sample_filter.project))
|
||||
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
(self.assertIn(call_arg[1], expected_args_list)
|
||||
for call_arg in ml_mock.call_args_list)
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_resource_filter(self, mock_mdf):
|
||||
|
@ -338,20 +341,11 @@ class TestGetSamples(base.BaseTestCase):
|
|||
ml_mock.return_value = (
|
||||
TestGetSamples.dummy_get_samples_mocked_return_value)
|
||||
|
||||
start_time = datetime.datetime(1970, 1, 1)
|
||||
end_time = datetime.datetime.utcnow()
|
||||
sample_filter = storage.SampleFilter(meter='specific meter',
|
||||
resource='specific resource')
|
||||
list(conn.get_samples(sample_filter))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
|
||||
expected_args_list = self.get_concurrent_task_args(
|
||||
conn, start_time, end_time, sample_filter,
|
||||
dimensions=dict(resource_id=sample_filter.resource))
|
||||
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
(self.assertIn(call_arg[1], expected_args_list)
|
||||
for call_arg in ml_mock.call_args_list)
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_source_filter(self, mdf_mock):
|
||||
|
@ -367,20 +361,11 @@ class TestGetSamples(base.BaseTestCase):
|
|||
ml_mock.return_value = (
|
||||
TestGetSamples.dummy_get_samples_mocked_return_value)
|
||||
|
||||
start_time = datetime.datetime(1970, 1, 1)
|
||||
end_time = datetime.datetime.utcnow()
|
||||
sample_filter = storage.SampleFilter(meter='specific meter',
|
||||
source='specific source')
|
||||
list(conn.get_samples(sample_filter))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
|
||||
expected_args_list = self.get_concurrent_task_args(
|
||||
conn, start_time, end_time, sample_filter,
|
||||
dimensions=dict(source=sample_filter.source))
|
||||
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
(self.assertIn(call_arg[1], expected_args_list)
|
||||
for call_arg in ml_mock.call_args_list)
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_simple_metaquery(self, mdf_mock):
|
||||
|
@ -394,20 +379,12 @@ class TestGetSamples(base.BaseTestCase):
|
|||
ml_mock.return_value = (
|
||||
TestGetSamples.dummy_get_samples_mocked_return_value)
|
||||
|
||||
start_time = datetime.datetime(1970, 1, 1)
|
||||
end_time = datetime.datetime.utcnow()
|
||||
sample_filter = storage.SampleFilter(
|
||||
meter='specific meter',
|
||||
metaquery={'metadata.key': u'value'})
|
||||
list(conn.get_samples(sample_filter))
|
||||
self.assertEqual(True, ml_mock.called)
|
||||
|
||||
expected_args_list = self.get_concurrent_task_args(
|
||||
conn, start_time, end_time, sample_filter)
|
||||
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
(self.assertIn(call_arg[1], expected_args_list)
|
||||
for call_arg in ml_mock.call_args_list)
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||
def test_get_samples_results(self, mdf_mock):
|
||||
|
@ -480,7 +457,7 @@ class TestGetSamples(base.BaseTestCase):
|
|||
get('measurements')[0][0]))
|
||||
self.assertEqual(results[0].user_id, None)
|
||||
|
||||
self.assertEqual(3, ml_mock.call_count)
|
||||
self.assertEqual(1, ml_mock.call_count)
|
||||
|
||||
|
||||
class MeterStatisticsTest(base.BaseTestCase):
|
||||
|
|
Loading…
Reference in New Issue