get_samples uses concurrent querying

Instead of sequentially querying for
measurements of a metric with different
dimensions, this patch queries them
at same time using mult-tasking.

Change-Id: Ief0d588ed1cc6d6ed0d1a198e888d545c32c188c
This commit is contained in:
Rohit Jaiswal 2015-10-09 20:18:43 +00:00
parent a4a1489eb2
commit fa6a78c71d
2 changed files with 324 additions and 108 deletions

View File

@ -23,9 +23,14 @@ from oslo_log import log
from oslo_utils import netutils
from oslo_utils import timeutils
import eventlet
from eventlet.queue import Empty
import ceilometer
from ceilometer.i18n import _
from ceilometer import monasca_client
from ceilometer.openstack.common import threadgroup
from ceilometer.publisher.monasca_data_filter import MonascaDataFilter
from ceilometer.storage import base
from ceilometer.storage import models as api_models
@ -37,7 +42,12 @@ OPTS = [
default=300,
help='Default period (in seconds) to use for querying stats '
'in case no period specified in the stats API call.'),
cfg.IntOpt('query_concurrency_limit',
default=30,
help='Number of concurrent queries to use for querying '
'Monasca API'),
]
cfg.CONF.register_opts(OPTS, group='monasca')
LOG = log.getLogger(__name__)
@ -287,6 +297,108 @@ class Connection(base.Connection):
source=metric['dimensions'].get('source'),
user_id=metric['dimensions'].get('user_id'))
def get_measurements(self, result_queue, metric_name, metric_dimensions,
meta_q, start_ts, end_ts, start_op, end_op, limit):
start_ts = timeutils.isotime(start_ts)
end_ts = timeutils.isotime(end_ts)
_search_args = dict(name=metric_name,
start_time=start_ts,
start_timestamp_op=start_op,
end_time=end_ts,
end_timestamp_op=end_op,
merge_metrics=False,
limit=limit,
dimensions=metric_dimensions)
_search_args = {k: v for k, v in _search_args.items()
if v is not None}
for sample in self.mc.measurements_list(**_search_args):
LOG.debug(_('Retrieved sample: %s'), sample)
d = sample['dimensions']
for measurement in sample['measurements']:
meas_dict = self._convert_to_dict(measurement,
sample['columns'])
vm = meas_dict['value_meta']
if not self._match_metaquery_to_value_meta(meta_q, vm):
continue
result_queue.put(api_models.Sample(
source=d.get('source'),
counter_name=sample['name'],
counter_type=d.get('type'),
counter_unit=d.get('unit'),
counter_volume=meas_dict['value'],
user_id=d.get('user_id'),
project_id=d.get('project_id'),
resource_id=d.get('resource_id'),
timestamp=timeutils.parse_isotime(meas_dict['timestamp']),
resource_metadata=meas_dict['value_meta'],
message_id=sample['id'],
message_signature='',
recorded_at=(
timeutils.parse_isotime(meas_dict['timestamp']))))
def get_next_time_delta(self, start, end, delta):
# Gets next time window
curr = start
while curr < end:
next = curr + delta
yield curr, next
curr = next
def get_next_task_args(self, sample_filter, delta, **kwargs):
# Yields next set of measurement related args
metrics = self.mc.metrics_list(**kwargs)
for start, end in self.get_next_time_delta(
sample_filter.start_timestamp,
sample_filter.end_timestamp,
delta):
for metric in metrics:
task = {'metric': metric['name'],
'dimension': metric['dimensions'],
'start_ts': start,
'end_ts': end}
LOG.debug(_('next task is : %s'), task)
yield task
def has_more_results(self, result_queue, t_pool):
if result_queue.empty() and t_pool.pool.running() == 0:
return False
return True
def fetch_from_queue(self, result_queue, t_pool):
# Fetches result from queue in non-blocking way
try:
result = result_queue.get_nowait()
LOG.debug(_('Retrieved result : %s'), result)
return result
except Empty:
# if no data in queue, yield to work threads
# to give them a chance
if t_pool.pool.running() > 0:
eventlet.sleep(0)
def get_results(self, result_queue, t_pool, limit=None, result_count=None):
# Inspect and yield results
if limit:
while result_count < limit:
if not self.has_more_results(result_queue, t_pool):
break
result = self.fetch_from_queue(result_queue, t_pool)
if result:
yield result
result_count += 1
else:
while True:
if not self.has_more_results(result_queue, t_pool):
break
result = self.fetch_from_queue(result_queue, t_pool)
if result:
yield result
def get_samples(self, sample_filter, limit=None):
"""Return an iterable of dictionaries containing sample information.
@ -309,6 +421,10 @@ class Connection(base.Connection):
:param sample_filter: constraints for the sample search.
:param limit: Maximum number of results to return.
"""
# Initialize pool of green work threads and queue to handle results
thread_pool = threadgroup.ThreadGroup(
thread_pool_size=cfg.CONF.monasca.query_concurrency_limit)
result_queue = eventlet.queue.Queue()
if not sample_filter or not sample_filter.meter:
raise ceilometer.NotImplementedError(
@ -338,15 +454,13 @@ class Connection(base.Connection):
'in get_samples')
if not sample_filter.start_timestamp:
sample_filter.start_timestamp = \
timeutils.isotime(datetime.datetime(1970, 1, 1))
else:
sample_filter.start_timestamp = \
timeutils.isotime(sample_filter.start_timestamp)
sample_filter.start_timestamp = datetime.datetime(1970, 1, 1)
if sample_filter.end_timestamp:
sample_filter.end_timestamp = \
timeutils.isotime(sample_filter.end_timestamp)
if not sample_filter.end_timestamp:
sample_filter.end_timestamp = datetime.datetime.utcnow()
delta = sample_filter.end_timestamp - sample_filter.start_timestamp
delta = delta / cfg.CONF.monasca.query_concurrency_limit
_dimensions = dict(
user_id=sample_filter.user,
@ -357,44 +471,38 @@ class Connection(base.Connection):
_dimensions = {k: v for k, v in _dimensions.items() if v is not None}
_search_args = dict(name=sample_filter.meter,
start_time=sample_filter.start_timestamp,
start_timestamp_op=(
sample_filter.start_timestamp_op),
end_time=sample_filter.end_timestamp,
end_timestamp_op=sample_filter.end_timestamp_op,
limit=limit,
merge_metrics=True,
_metric_args = dict(name=sample_filter.meter,
dimensions=_dimensions)
_search_args = {k: v for k, v in _search_args.items()
if v is not None}
if limit:
result_count = 0
for sample in self.mc.measurements_list(**_search_args):
LOG.debug(_('Retrieved sample: %s'), sample)
for task_cnt, task in enumerate(self.get_next_task_args(
sample_filter, delta, **_metric_args)):
# Spawn query_concurrency_limit number of green threads
# simultaneously to fetch measurements
thread_pool.add_thread(self.get_measurements,
result_queue,
task['metric'],
task['dimension'],
q,
task['start_ts'],
task['end_ts'],
sample_filter.start_timestamp_op,
sample_filter.end_timestamp_op,
limit)
# For every query_conncurrency_limit set of tasks,
# consume data from queue and yield before moving on to
# next set of tasks.
if (task_cnt + 1) % cfg.CONF.monasca.query_concurrency_limit == 0:
for result in self.get_results(result_queue, thread_pool,
limit,
result_count=result_count if
limit else None):
yield result
d = sample['dimensions']
for measurement in sample['measurements']:
meas_dict = self._convert_to_dict(measurement,
sample['columns'])
vm = meas_dict['value_meta']
if not self._match_metaquery_to_value_meta(q, vm):
continue
yield api_models.Sample(
source=d.get('source'),
counter_name=sample['name'],
counter_type=d.get('type'),
counter_unit=d.get('unit'),
counter_volume=meas_dict['value'],
user_id=d.get('user_id'),
project_id=d.get('project_id'),
resource_id=d.get('resource_id'),
timestamp=timeutils.parse_isotime(meas_dict['timestamp']),
resource_metadata=meas_dict['value_meta'],
message_id=sample['id'],
message_signature='',
recorded_at=(
timeutils.parse_isotime(meas_dict['timestamp'])))
# Shutdown threadpool
thread_pool.stop()
def get_meter_statistics(self, filter, period=None, groupby=None,
aggregate=None):

View File

@ -14,8 +14,11 @@
# under the License.
import collections
import datetime
import dateutil.parser
import mock
from oslo_config import fixture as fixture_config
from oslo_utils import timeutils
from oslotest import base
import ceilometer
@ -149,6 +152,17 @@ class TestGetSamples(base.BaseTestCase):
u'columns': [u'timestamp', u'value', u'value_meta'],
u'name': u'image'}])
dummy_metrics_mocked_return_value = (
[{u'dimensions': {},
u'id': u'2015-04-14T18:42:31Z',
u'name': u'specific meter'}])
def setUp(self):
super(TestGetSamples, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF([], project='ceilometer', validate_default_values=True)
self.CONF.set_override('query_concurrency_limit', 3, group='monasca')
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_not_implemented_params(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client"):
@ -169,162 +183,255 @@ class TestGetSamples(base.BaseTestCase):
self.assertRaises(ceilometer.NotImplementedError,
lambda: list(conn.get_samples(sample_filter)))
def get_concurrent_task_args(self, conn, start_time, end_time,
sample_filter, dimensions=None, limit=None):
delta = ((end_time - start_time) /
self.CONF.monasca.query_concurrency_limit)
expected_args_list = []
for start, end in impl_monasca.Connection.get_next_time_delta(
conn, start_time, end_time, delta):
if limit:
expected_args_list.append(dict(
dimensions=dimensions if dimensions else {},
start_time=timeutils.isotime(start),
start_timestamp_op=sample_filter.start_timestamp_op,
merge_metrics=False, name='specific meter',
limit=limit,
end_time=timeutils.isotime(end)))
else:
expected_args_list.append(dict(
dimensions=dimensions if dimensions else {},
start_time=timeutils.isotime(start),
start_timestamp_op=sample_filter.start_timestamp_op,
merge_metrics=False, name='specific meter',
end_time=timeutils.isotime(end)))
return expected_args_list
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_name(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
TestGetSamples.dummy_metrics_mocked_return_value
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime(2015, 4, 20)
sample_filter = storage.SampleFilter(
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
meter='specific meter',
end_timestamp=timeutils.isotime(end_time))
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
dimensions={},
start_time='1970-01-01T00:00:00Z',
merge_metrics=True, name='specific meter',
end_time=str(sample_filter.end_timestamp)),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_start_timestamp_filter(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
TestGetSamples.dummy_metrics_mocked_return_value
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(2015, 3, 20)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(
meter='specific meter',
start_timestamp='2015-03-20T00:00:00Z',
start_timestamp=timeutils.isotime(start_time),
start_timestamp_op='ge')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
dimensions={},
start_time=str(sample_filter.start_timestamp),
start_timestamp_op=sample_filter.start_timestamp_op,
merge_metrics=True, name='specific meter'),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_end_timestamp_filter(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
sample_filter = storage.SampleFilter(
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
dimensions={},
start_time='1970-01-01T00:00:00Z',
merge_metrics=True, name='specific meter',
end_time=str(sample_filter.end_timestamp)),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
expected_args_list = self.get_concurrent_task_args(conn,
start_time,
end_time,
sample_filter)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_limit(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
TestGetSamples.dummy_metrics_mocked_return_value
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime(2015, 4, 20)
sample_filter = storage.SampleFilter(
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
list(conn.get_samples(sample_filter, limit=50))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
dimensions={},
start_time='1970-01-01T00:00:00Z',
merge_metrics=True, name='specific meter', limit=50,
end_time=str(sample_filter.end_timestamp)),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
expected_args_list = self.get_concurrent_task_args(conn,
start_time,
end_time,
sample_filter,
limit=50)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_project_filter(self, mock_mdf):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
[{u'dimensions': dict(project_id='specific project'),
u'id': u'2015-04-14T18:42:31Z',
u'name': u'specific meter'}]
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(meter='specific meter',
project='specific project')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
start_time='1970-01-01T00:00:00Z',
merge_metrics=True, name='specific meter',
dimensions=dict(project_id=sample_filter.project)),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter,
dimensions=dict(project_id=sample_filter.project))
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_resource_filter(self, mock_mdf):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
[{u'dimensions': dict(resource_id='specific resource'),
u'id': u'2015-04-14T18:42:31Z',
u'name': u'specific meter'}]
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(meter='specific meter',
resource='specific resource')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
start_time='1970-01-01T00:00:00Z',
merge_metrics=True, name='specific meter',
dimensions=dict(resource_id=sample_filter.resource)),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter,
dimensions=dict(resource_id=sample_filter.resource))
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_source_filter(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
[{u'dimensions': dict(source='specific source'),
u'id': u'2015-04-14T18:42:31Z',
u'name': u'specific meter'}]
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(meter='specific meter',
source='specific source')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
start_time='1970-01-01T00:00:00Z',
merge_metrics=True, name='specific meter',
dimensions=dict(source=sample_filter.source)),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter,
dimensions=dict(source=sample_filter.source))
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_simple_metaquery(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
TestGetSamples.dummy_metrics_mocked_return_value
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(
meter='specific meter',
metaquery={'metadata.key': u'value'})
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
self.assertEqual(dict(
dimensions={},
start_time='1970-01-01T00:00:00Z',
merge_metrics=True, name='specific meter'),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_results(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
[{u'dimensions': {
'source': 'some source',
'project_id': 'some project ID',
'resource_id': 'some resource ID',
'type': 'some type',
'unit': 'some unit'},
u'id': u'2015-04-14T18:42:31Z',
u'name': u'image'}]
)
ml_mock = mock_client().measurements_list
# TODO(this test case needs more work)
ml_mock.return_value = (
[{u'dimensions': {
'source': 'some source',
@ -338,6 +445,7 @@ class TestGetSamples(base.BaseTestCase):
u'id': u'2015-04-14T18:42:31Z',
u'columns': [u'timestamp', u'value', u'value_meta'],
u'name': u'image'}])
sample_filter = storage.SampleFilter(
meter='specific meter',
start_timestamp='2015-03-20T00:00:00Z')
@ -378,7 +486,7 @@ class TestGetSamples(base.BaseTestCase):
get('measurements')[0][0]))
self.assertEqual(results[0].user_id, None)
self.assertEqual(1, ml_mock.call_count)
self.assertEqual(3, ml_mock.call_count)
class MeterStatisticsTest(base.BaseTestCase):