add support of batch recording metering data for mongodb

Add support of batch recording metering data to mongodb backend, since
the pymongo support *insert_many* interface which can be used to batch
record items, in "big-data" scenarios, this change can improve the
performance of metering data recording.

Change-Id: I9e1e61d5bb139349f7c2263fff8d230cb7096b5a
This commit is contained in:
liusheng 2016-05-31 16:30:39 +08:00
parent c51c9f1564
commit a2a04e5d23
6 changed files with 142 additions and 64 deletions

View File

@ -71,6 +71,8 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
def record_metering_data(self, data): def record_metering_data(self, data):
# We may have receive only one counter on the wire # We may have receive only one counter on the wire
if not data:
return
if not isinstance(data, list): if not isinstance(data, list):
data = [data] data = [data]
@ -82,17 +84,17 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
'resource_id': meter['resource_id'], 'resource_id': meter['resource_id'],
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'), 'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
'counter_volume': meter['counter_volume']}) 'counter_volume': meter['counter_volume']})
try:
# Convert the timestamp to a datetime instance. # Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting # Storage engines are responsible for converting
# that value to something they can store. # that value to something they can store.
if meter.get('timestamp'): if meter.get('timestamp'):
ts = timeutils.parse_isotime(meter['timestamp']) ts = timeutils.parse_isotime(meter['timestamp'])
meter['timestamp'] = timeutils.normalize_time(ts) meter['timestamp'] = timeutils.normalize_time(ts)
self.meter_conn.record_metering_data(meter) try:
self.meter_conn.record_metering_data_batch(data)
except Exception as err: except Exception as err:
LOG.error(_LE('Failed to record metering data: %s.'), err) LOG.error(_LE('Failed to record %(len)s: %(err)s.'),
# raise the exception to propagate it up in the chain. {'len': len(data), 'err': err})
raise raise
def record_events(self, events): def record_events(self, events):

View File

@ -137,6 +137,11 @@ class Connection(object):
def upgrade(): def upgrade():
"""Migrate the database to `version` or the most recent version.""" """Migrate the database to `version` or the most recent version."""
def record_metering_data_batch(self, samples):
"""Record the metering data in batch"""
for s in samples:
self.record_metering_data(s)
@staticmethod @staticmethod
def record_metering_data(data): def record_metering_data(data):
"""Write the data to the backend storage system. """Write the data to the backend storage system.

View File

@ -20,6 +20,9 @@
# under the License. # under the License.
"""MongoDB storage backend""" """MongoDB storage backend"""
import itertools
import operator
import copy import copy
import datetime import datetime
import uuid import uuid
@ -243,32 +246,54 @@ class Connection(pymongo_base.Connection):
self.conn.close() self.conn.close()
def record_metering_data(self, data): def record_metering_data(self, data):
"""Write the data to the backend storage system. # TODO(liusheng): this is a workaround that is because there are
# storage scenario tests which directly invoke this method and pass a
# sample dict with all the storage backends and
# call conn.record_metering_data. May all the Ceilometer
# native storage backends can support batch recording in future, and
# then we need to refactor the scenario tests.
self.record_metering_data_batch([data])
:param data: a dictionary such as returned by def record_metering_data_batch(self, samples):
ceilometer.publisher.utils.meter_message_from_counter """Record the metering data in batch.
:param samples: a list of samples dict.
""" """
# Record the updated resource metadata - we use $setOnInsert to # Record the updated resource metadata - we use $setOnInsert to
# unconditionally insert sample timestamps and resource metadata # unconditionally insert sample timestamps and resource metadata
# (in the update case, this must be conditional on the sample not # (in the update case, this must be conditional on the sample not
# being out-of-order) # being out-of-order)
data = copy.deepcopy(data) sorted_samples = sorted(
data['resource_metadata'] = pymongo_utils.improve_keys( copy.deepcopy(samples),
data.pop('resource_metadata')) key=lambda s: (s['resource_id'], s['timestamp']))
res_grouped_samples = itertools.groupby(
sorted_samples, key=operator.itemgetter('resource_id'))
samples_to_update_resource = []
for resource_id, g_samples in res_grouped_samples:
g_samples = list(g_samples)
g_samples[-1]['meter'] = [{'counter_name': s['counter_name'],
'counter_type': s['counter_type'],
'counter_unit': s['counter_unit'],
} for s in g_samples]
g_samples[-1]['last_sample_timestamp'] = g_samples[-1]['timestamp']
g_samples[-1]['first_sample_timestamp'] = g_samples[0]['timestamp']
samples_to_update_resource.append(g_samples[-1])
for sample in samples_to_update_resource:
sample['resource_metadata'] = pymongo_utils.improve_keys(
sample.pop('resource_metadata'))
resource = self.db.resource.find_one_and_update( resource = self.db.resource.find_one_and_update(
{'_id': data['resource_id']}, {'_id': sample['resource_id']},
{'$set': {'project_id': data['project_id'], {'$set': {'project_id': sample['project_id'],
'user_id': data['user_id'], 'user_id': sample['user_id'],
'source': data['source'], 'source': sample['source'],
}, },
'$setOnInsert': {'metadata': data['resource_metadata'], '$setOnInsert': {
'first_sample_timestamp': data['timestamp'], 'metadata': sample['resource_metadata'],
'last_sample_timestamp': data['timestamp'], 'first_sample_timestamp': sample['timestamp'],
}, 'last_sample_timestamp': sample['timestamp'],
'$addToSet': {'meter': {'counter_name': data['counter_name'],
'counter_type': data['counter_type'],
'counter_unit': data['counter_unit'],
}, },
'$addToSet': {
'meter': {'$each': sample['meter']},
}, },
}, },
upsert=True, upsert=True,
@ -279,33 +304,37 @@ class Connection(pymongo_base.Connection):
# in-order case) # in-order case)
last_sample_timestamp = resource.get('last_sample_timestamp') last_sample_timestamp = resource.get('last_sample_timestamp')
if (last_sample_timestamp is None or if (last_sample_timestamp is None or
last_sample_timestamp <= data['timestamp']): last_sample_timestamp <= sample['last_sample_timestamp']):
self.db.resource.update_one( self.db.resource.update_one(
{'_id': data['resource_id']}, {'_id': sample['resource_id']},
{'$set': {'metadata': data['resource_metadata'], {'$set': {'metadata': sample['resource_metadata'],
'last_sample_timestamp': data['timestamp']}} 'last_sample_timestamp':
sample['last_sample_timestamp']}}
) )
# only update first sample timestamp if actually earlier (the unusual # only update first sample timestamp if actually earlier (
# out-of-order case) # the unusual out-of-order case)
# NOTE: a null first sample timestamp is not updated as this indicates # NOTE: a null first sample timestamp is not updated as this
# a pre-existing resource document dating from before we started # indicates a pre-existing resource document dating from before
# recording these timestamps in the resource collection # we started recording these timestamps in the resource collection
first_sample_timestamp = resource.get('first_sample_timestamp') first_sample_timestamp = resource.get('first_sample_timestamp')
if (first_sample_timestamp is not None and if (first_sample_timestamp is not None and
first_sample_timestamp > data['timestamp']): first_sample_timestamp > sample['first_sample_timestamp']):
self.db.resource.update_one( self.db.resource.update_one(
{'_id': data['resource_id']}, {'_id': sample['resource_id']},
{'$set': {'first_sample_timestamp': data['timestamp']}} {'$set': {'first_sample_timestamp':
sample['first_sample_timestamp']}}
) )
# Record the raw data for the meter. Use a copy so we do not # Record the raw data for the meter. Use a copy so we do not
# modify a data structure owned by our caller (the driver adds # modify a data structure owned by our caller (the driver adds
# a new key '_id'). # a new key '_id').
record = copy.copy(data) record = copy.deepcopy(samples)
record['recorded_at'] = timeutils.utcnow() for s in record:
s['recorded_at'] = timeutils.utcnow()
self.db.meter.insert_one(record) s['resource_metadata'] = pymongo_utils.improve_keys(
s.pop('resource_metadata'))
self.db.meter.insert_many(record)
def clear_expired_metering_data(self, ttl): def clear_expired_metering_data(self, ttl):
"""Clear expired data from the backend storage system. """Clear expired data from the backend storage system.

View File

@ -3198,3 +3198,39 @@ class TestRecordUnicodeSamples(DBTestBase):
self.assertEqual(expected['counter_name'], actual['counter_name']) self.assertEqual(expected['counter_name'], actual['counter_name'])
self.assertEqual(expected['resource_metadata'], self.assertEqual(expected['resource_metadata'],
actual['resource_metadata']) actual['resource_metadata'])
@tests_db.run_with('mongodb')
class TestBatchRecordingMetering(tests_db.TestBase):
def test_batch_recording_metering_data(self):
self.sample_dicts = []
for i in range(1, 10):
s = sample.Sample(name='sample-%s' % i,
type=sample.TYPE_CUMULATIVE,
unit='',
volume=i * 0.1,
user_id='user-id',
project_id='project-id',
resource_id='resource-%s' % str(i % 3),
timestamp=datetime.datetime(2016, 6, 1, 15, i),
resource_metadata={'fake_meta': i},
source=None)
s_dict = utils.meter_message_from_counter(
s, self.CONF.publisher.telemetry_secret)
self.sample_dicts.append(s_dict)
self.conn.record_metering_data_batch(self.sample_dicts)
results = list(self.conn.query_samples())
self.assertEqual(len(self.sample_dicts), len(results))
for sample_item in results:
d = sample_item.as_dict()
del d['recorded_at']
self.assertIn(d, self.sample_dicts)
resources = list(self.conn.get_resources())
self.assertEqual(3, len(resources))
self.assertEqual('resource-0', resources[0].resource_id)
self.assertEqual({'fake_meta': 9}, resources[0].metadata)
self.assertEqual('resource-2', resources[1].resource_id)
self.assertEqual({'fake_meta': 8}, resources[1].metadata)
self.assertEqual('resource-1', resources[2].resource_id)
self.assertEqual({'fake_meta': 7}, resources[2].metadata)

View File

@ -91,7 +91,7 @@ class TestDispatcherDB(base.BaseTestCase):
called = False called = False
def record_metering_data(self, data): def record_metering_data_batch(self, data):
self.called = True self.called = True
self.dispatcher._meter_conn = ErrorConnection() self.dispatcher._meter_conn = ErrorConnection()

View File

@ -0,0 +1,6 @@
---
features:
- Add support of batch recording metering data to mongodb backend, since
the pymongo support *insert_many* interface which can be used to batch
record items, in "big-data" scenarios, this change can improve the
performance of metering data recording.