Merge "add support of batch recording metering data for mongodb"
This commit is contained in:
commit
e710c5d5b4
@ -71,6 +71,8 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
||||
|
||||
def record_metering_data(self, data):
|
||||
# We may have receive only one counter on the wire
|
||||
if not data:
|
||||
return
|
||||
if not isinstance(data, list):
|
||||
data = [data]
|
||||
|
||||
@ -82,18 +84,18 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
||||
'resource_id': meter['resource_id'],
|
||||
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
|
||||
'counter_volume': meter['counter_volume']})
|
||||
try:
|
||||
# Convert the timestamp to a datetime instance.
|
||||
# Storage engines are responsible for converting
|
||||
# that value to something they can store.
|
||||
if meter.get('timestamp'):
|
||||
ts = timeutils.parse_isotime(meter['timestamp'])
|
||||
meter['timestamp'] = timeutils.normalize_time(ts)
|
||||
self.meter_conn.record_metering_data(meter)
|
||||
except Exception as err:
|
||||
LOG.error(_LE('Failed to record metering data: %s.'), err)
|
||||
# raise the exception to propagate it up in the chain.
|
||||
raise
|
||||
# Convert the timestamp to a datetime instance.
|
||||
# Storage engines are responsible for converting
|
||||
# that value to something they can store.
|
||||
if meter.get('timestamp'):
|
||||
ts = timeutils.parse_isotime(meter['timestamp'])
|
||||
meter['timestamp'] = timeutils.normalize_time(ts)
|
||||
try:
|
||||
self.meter_conn.record_metering_data_batch(data)
|
||||
except Exception as err:
|
||||
LOG.error(_LE('Failed to record %(len)s: %(err)s.'),
|
||||
{'len': len(data), 'err': err})
|
||||
raise
|
||||
|
||||
def record_events(self, events):
|
||||
if not isinstance(events, list):
|
||||
|
@ -137,6 +137,11 @@ class Connection(object):
|
||||
def upgrade():
|
||||
"""Migrate the database to `version` or the most recent version."""
|
||||
|
||||
def record_metering_data_batch(self, samples):
|
||||
"""Record the metering data in batch"""
|
||||
for s in samples:
|
||||
self.record_metering_data(s)
|
||||
|
||||
@staticmethod
|
||||
def record_metering_data(data):
|
||||
"""Write the data to the backend storage system.
|
||||
|
@ -20,6 +20,9 @@
|
||||
# under the License.
|
||||
"""MongoDB storage backend"""
|
||||
|
||||
import itertools
|
||||
import operator
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import uuid
|
||||
@ -243,69 +246,95 @@ class Connection(pymongo_base.Connection):
|
||||
self.conn.close()
|
||||
|
||||
def record_metering_data(self, data):
|
||||
"""Write the data to the backend storage system.
|
||||
# TODO(liusheng): this is a workaround that is because there are
|
||||
# storage scenario tests which directly invoke this method and pass a
|
||||
# sample dict with all the storage backends and
|
||||
# call conn.record_metering_data. May all the Ceilometer
|
||||
# native storage backends can support batch recording in future, and
|
||||
# then we need to refactor the scenario tests.
|
||||
self.record_metering_data_batch([data])
|
||||
|
||||
:param data: a dictionary such as returned by
|
||||
ceilometer.publisher.utils.meter_message_from_counter
|
||||
def record_metering_data_batch(self, samples):
|
||||
"""Record the metering data in batch.
|
||||
|
||||
:param samples: a list of samples dict.
|
||||
"""
|
||||
# Record the updated resource metadata - we use $setOnInsert to
|
||||
# unconditionally insert sample timestamps and resource metadata
|
||||
# (in the update case, this must be conditional on the sample not
|
||||
# being out-of-order)
|
||||
data = copy.deepcopy(data)
|
||||
data['resource_metadata'] = pymongo_utils.improve_keys(
|
||||
data.pop('resource_metadata'))
|
||||
resource = self.db.resource.find_one_and_update(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'project_id': data['project_id'],
|
||||
'user_id': data['user_id'],
|
||||
'source': data['source'],
|
||||
},
|
||||
'$setOnInsert': {'metadata': data['resource_metadata'],
|
||||
'first_sample_timestamp': data['timestamp'],
|
||||
'last_sample_timestamp': data['timestamp'],
|
||||
},
|
||||
'$addToSet': {'meter': {'counter_name': data['counter_name'],
|
||||
'counter_type': data['counter_type'],
|
||||
'counter_unit': data['counter_unit'],
|
||||
},
|
||||
},
|
||||
},
|
||||
upsert=True,
|
||||
return_document=pymongo.ReturnDocument.AFTER,
|
||||
)
|
||||
|
||||
# only update last sample timestamp if actually later (the usual
|
||||
# in-order case)
|
||||
last_sample_timestamp = resource.get('last_sample_timestamp')
|
||||
if (last_sample_timestamp is None or
|
||||
last_sample_timestamp <= data['timestamp']):
|
||||
self.db.resource.update_one(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'metadata': data['resource_metadata'],
|
||||
'last_sample_timestamp': data['timestamp']}}
|
||||
sorted_samples = sorted(
|
||||
copy.deepcopy(samples),
|
||||
key=lambda s: (s['resource_id'], s['timestamp']))
|
||||
res_grouped_samples = itertools.groupby(
|
||||
sorted_samples, key=operator.itemgetter('resource_id'))
|
||||
samples_to_update_resource = []
|
||||
for resource_id, g_samples in res_grouped_samples:
|
||||
g_samples = list(g_samples)
|
||||
g_samples[-1]['meter'] = [{'counter_name': s['counter_name'],
|
||||
'counter_type': s['counter_type'],
|
||||
'counter_unit': s['counter_unit'],
|
||||
} for s in g_samples]
|
||||
g_samples[-1]['last_sample_timestamp'] = g_samples[-1]['timestamp']
|
||||
g_samples[-1]['first_sample_timestamp'] = g_samples[0]['timestamp']
|
||||
samples_to_update_resource.append(g_samples[-1])
|
||||
for sample in samples_to_update_resource:
|
||||
sample['resource_metadata'] = pymongo_utils.improve_keys(
|
||||
sample.pop('resource_metadata'))
|
||||
resource = self.db.resource.find_one_and_update(
|
||||
{'_id': sample['resource_id']},
|
||||
{'$set': {'project_id': sample['project_id'],
|
||||
'user_id': sample['user_id'],
|
||||
'source': sample['source'],
|
||||
},
|
||||
'$setOnInsert': {
|
||||
'metadata': sample['resource_metadata'],
|
||||
'first_sample_timestamp': sample['timestamp'],
|
||||
'last_sample_timestamp': sample['timestamp'],
|
||||
},
|
||||
'$addToSet': {
|
||||
'meter': {'$each': sample['meter']},
|
||||
},
|
||||
},
|
||||
upsert=True,
|
||||
return_document=pymongo.ReturnDocument.AFTER,
|
||||
)
|
||||
|
||||
# only update first sample timestamp if actually earlier (the unusual
|
||||
# out-of-order case)
|
||||
# NOTE: a null first sample timestamp is not updated as this indicates
|
||||
# a pre-existing resource document dating from before we started
|
||||
# recording these timestamps in the resource collection
|
||||
first_sample_timestamp = resource.get('first_sample_timestamp')
|
||||
if (first_sample_timestamp is not None and
|
||||
first_sample_timestamp > data['timestamp']):
|
||||
self.db.resource.update_one(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'first_sample_timestamp': data['timestamp']}}
|
||||
)
|
||||
# only update last sample timestamp if actually later (the usual
|
||||
# in-order case)
|
||||
last_sample_timestamp = resource.get('last_sample_timestamp')
|
||||
if (last_sample_timestamp is None or
|
||||
last_sample_timestamp <= sample['last_sample_timestamp']):
|
||||
self.db.resource.update_one(
|
||||
{'_id': sample['resource_id']},
|
||||
{'$set': {'metadata': sample['resource_metadata'],
|
||||
'last_sample_timestamp':
|
||||
sample['last_sample_timestamp']}}
|
||||
)
|
||||
|
||||
# only update first sample timestamp if actually earlier (
|
||||
# the unusual out-of-order case)
|
||||
# NOTE: a null first sample timestamp is not updated as this
|
||||
# indicates a pre-existing resource document dating from before
|
||||
# we started recording these timestamps in the resource collection
|
||||
first_sample_timestamp = resource.get('first_sample_timestamp')
|
||||
if (first_sample_timestamp is not None and
|
||||
first_sample_timestamp > sample['first_sample_timestamp']):
|
||||
self.db.resource.update_one(
|
||||
{'_id': sample['resource_id']},
|
||||
{'$set': {'first_sample_timestamp':
|
||||
sample['first_sample_timestamp']}}
|
||||
)
|
||||
|
||||
# Record the raw data for the meter. Use a copy so we do not
|
||||
# modify a data structure owned by our caller (the driver adds
|
||||
# a new key '_id').
|
||||
record = copy.copy(data)
|
||||
record['recorded_at'] = timeutils.utcnow()
|
||||
|
||||
self.db.meter.insert_one(record)
|
||||
record = copy.deepcopy(samples)
|
||||
for s in record:
|
||||
s['recorded_at'] = timeutils.utcnow()
|
||||
s['resource_metadata'] = pymongo_utils.improve_keys(
|
||||
s.pop('resource_metadata'))
|
||||
self.db.meter.insert_many(record)
|
||||
|
||||
def clear_expired_metering_data(self, ttl):
|
||||
"""Clear expired data from the backend storage system.
|
||||
|
@ -3198,3 +3198,39 @@ class TestRecordUnicodeSamples(DBTestBase):
|
||||
self.assertEqual(expected['counter_name'], actual['counter_name'])
|
||||
self.assertEqual(expected['resource_metadata'],
|
||||
actual['resource_metadata'])
|
||||
|
||||
|
||||
@tests_db.run_with('mongodb')
|
||||
class TestBatchRecordingMetering(tests_db.TestBase):
|
||||
def test_batch_recording_metering_data(self):
|
||||
self.sample_dicts = []
|
||||
for i in range(1, 10):
|
||||
s = sample.Sample(name='sample-%s' % i,
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='',
|
||||
volume=i * 0.1,
|
||||
user_id='user-id',
|
||||
project_id='project-id',
|
||||
resource_id='resource-%s' % str(i % 3),
|
||||
timestamp=datetime.datetime(2016, 6, 1, 15, i),
|
||||
resource_metadata={'fake_meta': i},
|
||||
source=None)
|
||||
s_dict = utils.meter_message_from_counter(
|
||||
s, self.CONF.publisher.telemetry_secret)
|
||||
self.sample_dicts.append(s_dict)
|
||||
self.conn.record_metering_data_batch(self.sample_dicts)
|
||||
results = list(self.conn.query_samples())
|
||||
self.assertEqual(len(self.sample_dicts), len(results))
|
||||
for sample_item in results:
|
||||
d = sample_item.as_dict()
|
||||
del d['recorded_at']
|
||||
self.assertIn(d, self.sample_dicts)
|
||||
|
||||
resources = list(self.conn.get_resources())
|
||||
self.assertEqual(3, len(resources))
|
||||
self.assertEqual('resource-0', resources[0].resource_id)
|
||||
self.assertEqual({'fake_meta': 9}, resources[0].metadata)
|
||||
self.assertEqual('resource-2', resources[1].resource_id)
|
||||
self.assertEqual({'fake_meta': 8}, resources[1].metadata)
|
||||
self.assertEqual('resource-1', resources[2].resource_id)
|
||||
self.assertEqual({'fake_meta': 7}, resources[2].metadata)
|
||||
|
@ -91,7 +91,7 @@ class TestDispatcherDB(base.BaseTestCase):
|
||||
|
||||
called = False
|
||||
|
||||
def record_metering_data(self, data):
|
||||
def record_metering_data_batch(self, data):
|
||||
self.called = True
|
||||
|
||||
self.dispatcher._meter_conn = ErrorConnection()
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- Add support of batch recording metering data to mongodb backend, since
|
||||
the pymongo support *insert_many* interface which can be used to batch
|
||||
record items, in "big-data" scenarios, this change can improve the
|
||||
performance of metering data recording.
|
Loading…
x
Reference in New Issue
Block a user