improve performance of resource-list in sql

simplify the query to retrieve list of resources. it is much simpler
to query against raw Sample data then to join against Resource table.

also, add meter_links param to allow ability to disable link generation of
relate meters.

Change-Id: Ia4ceafc568c4a2e4c8e6b7586511135627b9335e
Closes-Bug: #1264434
Implements: blueprint big-data-sql
This commit is contained in:
Gordon Chung 2014-03-13 13:00:58 -04:00 committed by Gerrit Code Review
parent e8a7eed15c
commit 13e2ebcfb0
3 changed files with 122 additions and 132 deletions

View File

@ -1425,14 +1425,15 @@ class Resource(_Base):
class ResourcesController(rest.RestController): class ResourcesController(rest.RestController):
"""Works on resources.""" """Works on resources."""
def _resource_links(self, resource_id): def _resource_links(self, resource_id, meter_links=1):
links = [_make_link('self', pecan.request.host_url, 'resources', links = [_make_link('self', pecan.request.host_url, 'resources',
resource_id)] resource_id)]
for meter in pecan.request.storage_conn.get_meters(resource= if meter_links:
resource_id): for meter in pecan.request.storage_conn.get_meters(resource=
query = {'field': 'resource_id', 'value': resource_id} resource_id):
links.append(_make_link(meter.name, pecan.request.host_url, query = {'field': 'resource_id', 'value': resource_id}
'meters', meter.name, query=query)) links.append(_make_link(meter.name, pecan.request.host_url,
'meters', meter.name, query=query))
return links return links
@wsme_pecan.wsexpose(Resource, unicode) @wsme_pecan.wsexpose(Resource, unicode)
@ -1449,16 +1450,18 @@ class ResourcesController(rest.RestController):
return Resource.from_db_and_links(resources[0], return Resource.from_db_and_links(resources[0],
self._resource_links(resource_id)) self._resource_links(resource_id))
@wsme_pecan.wsexpose([Resource], [Query]) @wsme_pecan.wsexpose([Resource], [Query], int)
def get_all(self, q=[]): def get_all(self, q=[], meter_links=1):
"""Retrieve definitions of all of the resources. """Retrieve definitions of all of the resources.
:param q: Filter rules for the resources to be returned. :param q: Filter rules for the resources to be returned.
:param meter_links: option to include related meter links
""" """
kwargs = _query_to_kwargs(q, pecan.request.storage_conn.get_resources) kwargs = _query_to_kwargs(q, pecan.request.storage_conn.get_resources)
resources = [ resources = [
Resource.from_db_and_links(r, Resource.from_db_and_links(r,
self._resource_links(r.resource_id)) self._resource_links(r.resource_id,
meter_links))
for r in pecan.request.storage_conn.get_resources(**kwargs)] for r in pecan.request.storage_conn.get_resources(**kwargs)]
return resources return resources

View File

@ -444,90 +444,67 @@ class Connection(base.Connection):
:param resource: Optional resource filter. :param resource: Optional resource filter.
:param pagination: Optional pagination query. :param pagination: Optional pagination query.
""" """
# We probably want to raise these early, since we don't know from here
# if they will be handled. We don't want extra wait or work for it to
# just fail.
if pagination: if pagination:
raise NotImplementedError('Pagination not implemented') raise NotImplementedError('Pagination not implemented')
def _apply_filters(query):
#TODO(gordc) this should be merged with make_query_from_filter
for column, value in [(models.Sample.resource_id, resource),
(models.Sample.user_id, user),
(models.Sample.project_id, project)]:
if value:
query = query.filter(column == value)
if source:
query = query.filter(
models.Sample.sources.any(id=source))
if metaquery:
query = apply_metaquery_filter(session, query, metaquery)
if start_timestamp:
if start_timestamp_op == 'gt':
query = query.filter(
models.Sample.timestamp > start_timestamp)
else:
query = query.filter(
models.Sample.timestamp >= start_timestamp)
if end_timestamp:
if end_timestamp_op == 'le':
query = query.filter(
models.Sample.timestamp <= end_timestamp)
else:
query = query.filter(
models.Sample.timestamp < end_timestamp)
return query
session = self._get_db_session() session = self._get_db_session()
# get list of resource_ids
res_q = session.query(distinct(models.Sample.resource_id))
res_q = _apply_filters(res_q)
# (thomasm) We need to get the max timestamp first, since that's the for res_id in res_q.all():
# most accurate. We also need to filter down in the subquery to # get latest Sample
# constrain what we have to JOIN on later. max_q = session.query(models.Sample)\
ts_subquery = session.query( .filter(models.Sample.resource_id == res_id[0])
models.Sample.resource_id, max_q = _apply_filters(max_q)
func.max(models.Sample.timestamp).label("max_ts"), max_q = max_q.order_by(models.Sample.timestamp.desc(),
func.min(models.Sample.timestamp).label("min_ts") models.Sample.id.desc()).limit(1)
).group_by(models.Sample.resource_id)
# Here are the basic 'eq' operation filters for the sample data. # get the min timestamp value.
for column, value in [(models.Sample.resource_id, resource), min_q = session.query(models.Sample.timestamp)\
(models.Sample.user_id, user), .filter(models.Sample.resource_id == res_id[0])
(models.Sample.project_id, project)]: min_q = _apply_filters(min_q)
if value: min_q = min_q.order_by(models.Sample.timestamp.asc()).limit(1)
ts_subquery = ts_subquery.filter(column == value)
if source: sample = max_q.first()
ts_subquery = ts_subquery.filter( if sample:
models.Sample.sources.any(id=source)) yield api_models.Resource(
resource_id=sample.resource_id,
if metaquery: project_id=sample.project_id,
ts_subquery = apply_metaquery_filter(session, first_sample_timestamp=min_q.first().timestamp,
ts_subquery, last_sample_timestamp=sample.timestamp,
metaquery) source=sample.sources[0].id,
user_id=sample.user_id,
# Here we limit the samples being used to a specific time period, metadata=sample.resource_metadata
# if requested. )
if start_timestamp:
if start_timestamp_op == 'gt':
ts_subquery = ts_subquery.filter(
models.Sample.timestamp > start_timestamp)
else:
ts_subquery = ts_subquery.filter(
models.Sample.timestamp >= start_timestamp)
if end_timestamp:
if end_timestamp_op == 'le':
ts_subquery = ts_subquery.filter(
models.Sample.timestamp <= end_timestamp)
else:
ts_subquery = ts_subquery.filter(
models.Sample.timestamp < end_timestamp)
ts_subquery = ts_subquery.subquery()
# Now we need to get the max Sample.id out of the leftover results, to
# break any ties.
agg_subquery = session.query(
func.max(models.Sample.id).label("max_id"),
ts_subquery
).filter(
models.Sample.resource_id == ts_subquery.c.resource_id,
models.Sample.timestamp == ts_subquery.c.max_ts
).group_by(
ts_subquery.c.resource_id,
ts_subquery.c.max_ts,
ts_subquery.c.min_ts
).subquery()
query = session.query(
models.Sample,
agg_subquery.c.min_ts,
agg_subquery.c.max_ts
).filter(
models.Sample.id == agg_subquery.c.max_id
)
for sample, first_ts, last_ts in query.all():
yield api_models.Resource(
resource_id=sample.resource_id,
project_id=sample.project_id,
first_sample_timestamp=first_ts,
last_sample_timestamp=last_ts,
source=sample.sources[0].id,
user_id=sample.user_id,
metadata=sample.resource_metadata
)
def get_meters(self, user=None, project=None, resource=None, source=None, def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}, pagination=None): metaquery={}, pagination=None):
@ -544,66 +521,48 @@ class Connection(base.Connection):
if pagination: if pagination:
raise NotImplementedError('Pagination not implemented') raise NotImplementedError('Pagination not implemented')
def _apply_filters(query):
#TODO(gordc) this should be merged with make_query_from_filter
for column, value in [(models.Sample.resource_id, resource),
(models.Sample.user_id, user),
(models.Sample.project_id, project)]:
if value:
query = query.filter(column == value)
if source is not None:
query = query.filter(
models.Sample.sources.any(id=source))
if metaquery:
query = apply_metaquery_filter(session, query, metaquery)
return query
session = self._get_db_session() session = self._get_db_session()
# Sample table will store large records and join with resource # sample_subq is used to reduce sample records
# will be very slow. # by selecting a record for each (resource_id, meter_id).
# subquery_sample is used to reduce sample records
# by selecting a record for each (resource_id, counter_name).
# max() is used to choice a sample record, so the latest record # max() is used to choice a sample record, so the latest record
# is selected for each (resource_id, meter.name). # is selected for each (resource_id, meter_id).
sample_subq = session.query(
subquery_sample = session.query(
func.max(models.Sample.id).label('id'))\ func.max(models.Sample.id).label('id'))\
.join(models.Meter)\ .group_by(models.Sample.meter_id, models.Sample.resource_id)
.group_by(models.Sample.resource_id, sample_subq = sample_subq.subquery()
models.Meter.name).subquery()
# The SQL of query_sample is essentially:
#
# SELECT sample.* FROM sample INNER JOIN # SELECT sample.* FROM sample INNER JOIN
# (SELECT max(sample.id) AS id FROM sample # (SELECT max(sample.id) AS id FROM sample
# GROUP BY sample.resource_id, meter.name) AS anon_2 # GROUP BY sample.resource_id, sample.meter_id) AS anon_2
# ON sample.id = anon_2.id # ON sample.id = anon_2.id
query_sample = session.query(models.MeterSample).\ query_sample = session.query(models.MeterSample).\
join(subquery_sample, join(sample_subq, models.MeterSample.id == sample_subq.c.id)
models.MeterSample.id == subquery_sample.c.id) query_sample = _apply_filters(query_sample)
if metaquery: for sample in query_sample.all():
query_sample = apply_metaquery_filter(session,
query_sample,
metaquery)
alias_sample = aliased(models.MeterSample,
query_sample.with_labels().subquery())
query = session.query(models.Resource, alias_sample)\
.filter(models.Resource.id == alias_sample.resource_id)
if user is not None:
query = query.filter(models.Resource.user_id == user)
if source is not None:
query = query.filter(models.Resource.sources.any(id=source))
if resource:
query = query.filter(models.Resource.id == resource)
if project is not None:
query = query.filter(models.Resource.project_id == project)
for resource, sample in query.all():
yield api_models.Meter( yield api_models.Meter(
name=sample.counter_name, name=sample.counter_name,
type=sample.counter_type, type=sample.counter_type,
unit=sample.counter_unit, unit=sample.counter_unit,
resource_id=resource.id, resource_id=sample.resource_id,
project_id=resource.project_id, project_id=sample.project_id,
source=resource.sources[0].id, source=sample.sources[0].id,
user_id=resource.user_id) user_id=sample.user_id)
def _apply_options(self, query, orderby, limit, table):
query = self._apply_order_by(query, orderby, table)
if limit is not None:
query = query.limit(limit)
return query
def _retrieve_samples(self, query): def _retrieve_samples(self, query):
samples = query.all() samples = query.all()

View File

@ -515,3 +515,31 @@ class TestListResources(FunctionalTest,
self.assertTrue((self.PATH_PREFIX + '/meters/instance?' self.assertTrue((self.PATH_PREFIX + '/meters/instance?'
'q.field=resource_id&q.value=resource-id') 'q.field=resource_id&q.value=resource-id')
in links[1]['href']) in links[1]['href'])
def test_resource_skip_meter_links(self):
sample1 = sample.Sample(
'instance',
'cumulative',
'',
1,
'user-id',
'project-id',
'resource-id',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample',
},
source='test_list_resources',
)
msg = utils.meter_message_from_counter(
sample1,
self.CONF.publisher.metering_secret,
)
self.conn.record_metering_data(msg)
data = self.get_json('/resources?meter_links=0')
links = data[0]['links']
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['rel'], 'self')
self.assertTrue((self.PATH_PREFIX + '/resources/resource-id')
in links[0]['href'])