Merge "refactor db2 get_meter_statistics method to support mongodb and db2"
This commit is contained in:
commit
9787e44190
@ -21,16 +21,20 @@
|
||||
"""DB2 storage backend
|
||||
"""
|
||||
|
||||
from __future__ import division
|
||||
import copy
|
||||
import weakref
|
||||
import itertools
|
||||
|
||||
import bson.code
|
||||
import bson.objectid
|
||||
import datetime
|
||||
import pymongo
|
||||
import sys
|
||||
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage import models
|
||||
@ -182,6 +186,8 @@ class Connection(base.Connection):
|
||||
|
||||
SORT_OPERATION_MAP = {'desc': pymongo.DESCENDING, 'asc': pymongo.ASCENDING}
|
||||
|
||||
SECONDS_IN_A_DAY = 86400
|
||||
|
||||
def __init__(self, conf):
|
||||
url = conf.database.connection
|
||||
|
||||
@ -291,6 +297,7 @@ class Connection(base.Connection):
|
||||
# removal of all the empty dbs created during the test runs since
|
||||
# test run is against mongodb on Jenkins
|
||||
self.conn.drop_database(self.db)
|
||||
self.conn.close()
|
||||
|
||||
def record_metering_data(self, data):
|
||||
"""Write the data to the backend storage system.
|
||||
@ -521,42 +528,81 @@ class Connection(base.Connection):
|
||||
statistics described by the query parameters.
|
||||
|
||||
The filter must have a meter value set.
|
||||
|
||||
"""
|
||||
if self._using_mongodb:
|
||||
raise NotImplementedError("Statistics not implemented.")
|
||||
|
||||
if groupby:
|
||||
raise NotImplementedError("Group by not implemented.")
|
||||
if (groupby and
|
||||
set(groupby) - set(['user_id', 'project_id',
|
||||
'resource_id', 'source'])):
|
||||
raise NotImplementedError("Unable to group by these fields")
|
||||
|
||||
q = make_query_from_filter(sample_filter)
|
||||
|
||||
if period:
|
||||
raise NotImplementedError('Statistics for period not implemented.')
|
||||
if sample_filter.start:
|
||||
period_start = sample_filter.start
|
||||
else:
|
||||
period_start = self.db.meter.find(
|
||||
limit=1, sort=[('timestamp',
|
||||
pymongo.ASCENDING)])[0]['timestamp']
|
||||
|
||||
results = self.db.meter.aggregate([
|
||||
{'$match': q},
|
||||
{'$group': self.GROUP},
|
||||
{'$project': self.PROJECT},
|
||||
])
|
||||
|
||||
# Since there is no period grouping, there should be only one set in
|
||||
# the results
|
||||
rslt = results['result'][0]
|
||||
|
||||
duration = rslt['duration_end'] - rslt['duration_start']
|
||||
if hasattr(duration, 'total_seconds'):
|
||||
rslt['duration'] = duration.total_seconds()
|
||||
if groupby:
|
||||
sort_keys = ['counter_name'] + groupby + ['timestamp']
|
||||
else:
|
||||
rslt['duration'] = duration.days * 3600 + duration.seconds
|
||||
sort_keys = ['counter_name', 'timestamp']
|
||||
|
||||
rslt['period_start'] = rslt['duration_start']
|
||||
rslt['period_end'] = rslt['duration_end']
|
||||
# Period is not supported, set it to zero
|
||||
rslt['period'] = 0
|
||||
rslt['groupby'] = None
|
||||
sort_instructions = self._build_sort_instructions(sort_keys=sort_keys,
|
||||
sort_dir='asc')
|
||||
meters = self.db.meter.find(q, sort=sort_instructions)
|
||||
|
||||
return [models.Statistics(**(rslt))]
|
||||
def _group_key(meter):
|
||||
# the method to define a key for groupby call
|
||||
key = {}
|
||||
for y in sort_keys:
|
||||
if y == 'timestamp' and period:
|
||||
key[y] = (timeutils.delta_seconds(period_start,
|
||||
meter[y]) // period)
|
||||
elif y != 'timestamp':
|
||||
key[y] = meter[y]
|
||||
return key
|
||||
|
||||
def _to_offset(periods):
|
||||
return {'days': (periods * period) // self.SECONDS_IN_A_DAY,
|
||||
'seconds': (periods * period) % self.SECONDS_IN_A_DAY}
|
||||
|
||||
for key, grouped_meters in itertools.groupby(meters, key=_group_key):
|
||||
stat = models.Statistics(None, sys.maxint, -sys.maxint, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, None)
|
||||
|
||||
for meter in grouped_meters:
|
||||
stat.unit = meter.get('counter_unit', '')
|
||||
m_volume = meter.get('counter_volume')
|
||||
if stat.min > m_volume:
|
||||
stat.min = m_volume
|
||||
if stat.max < m_volume:
|
||||
stat.max = m_volume
|
||||
stat.sum += m_volume
|
||||
stat.count += 1
|
||||
if stat.duration_start == 0:
|
||||
stat.duration_start = meter['timestamp']
|
||||
stat.duration_end = meter['timestamp']
|
||||
if groupby and not stat.groupby:
|
||||
stat.groupby = {}
|
||||
for group_key in groupby:
|
||||
stat.groupby[group_key] = meter[group_key]
|
||||
|
||||
stat.duration = timeutils.delta_seconds(stat.duration_start,
|
||||
stat.duration_end)
|
||||
stat.avg = stat.sum / stat.count
|
||||
if period:
|
||||
stat.period = period
|
||||
periods = key.get('timestamp')
|
||||
stat.period_start = period_start + \
|
||||
datetime.timedelta(**(_to_offset(periods)))
|
||||
stat.period_end = period_start + \
|
||||
datetime.timedelta(**(_to_offset(periods + 1)))
|
||||
else:
|
||||
stat.period_start = stat.duration_start
|
||||
stat.period_end = stat.duration_end
|
||||
yield stat
|
||||
|
||||
@staticmethod
|
||||
def _decode_matching_metadata(matching_metadata):
|
||||
|
@ -46,5 +46,5 @@ MongoDB Yes Yes Yes
|
||||
MySQL Yes, except metadata querying Yes Yes
|
||||
PostgreSQL Yes, except metadata querying Yes Yes
|
||||
HBase Yes Yes, except groupby No
|
||||
DB2 Yes No No
|
||||
DB2 Yes Yes No
|
||||
================== ============================= =================== ======
|
||||
|
Loading…
Reference in New Issue
Block a user