diff --git a/ceilometer/dispatcher/database.py b/ceilometer/dispatcher/database.py index 6d734d05ed..9738435b3f 100644 --- a/ceilometer/dispatcher/database.py +++ b/ceilometer/dispatcher/database.py @@ -71,6 +71,8 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase, def record_metering_data(self, data): # We may have receive only one counter on the wire + if not data: + return if not isinstance(data, list): data = [data] @@ -82,18 +84,18 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase, 'resource_id': meter['resource_id'], 'timestamp': meter.get('timestamp', 'NO TIMESTAMP'), 'counter_volume': meter['counter_volume']}) - try: - # Convert the timestamp to a datetime instance. - # Storage engines are responsible for converting - # that value to something they can store. - if meter.get('timestamp'): - ts = timeutils.parse_isotime(meter['timestamp']) - meter['timestamp'] = timeutils.normalize_time(ts) - self.meter_conn.record_metering_data(meter) - except Exception as err: - LOG.error(_LE('Failed to record metering data: %s.'), err) - # raise the exception to propagate it up in the chain. - raise + # Convert the timestamp to a datetime instance. + # Storage engines are responsible for converting + # that value to something they can store. + if meter.get('timestamp'): + ts = timeutils.parse_isotime(meter['timestamp']) + meter['timestamp'] = timeutils.normalize_time(ts) + try: + self.meter_conn.record_metering_data_batch(data) + except Exception as err: + LOG.error(_LE('Failed to record %(len)s: %(err)s.'), + {'len': len(data), 'err': err}) + raise def record_events(self, events): if not isinstance(events, list): diff --git a/ceilometer/storage/base.py b/ceilometer/storage/base.py index e8c4e97eb4..5b792d1b58 100644 --- a/ceilometer/storage/base.py +++ b/ceilometer/storage/base.py @@ -137,6 +137,11 @@ class Connection(object): def upgrade(): """Migrate the database to `version` or the most recent version.""" + def record_metering_data_batch(self, samples): + """Record the metering data in batch""" + for s in samples: + self.record_metering_data(s) + @staticmethod def record_metering_data(data): """Write the data to the backend storage system. diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index afe07df168..30c8f051ba 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -20,6 +20,9 @@ # under the License. """MongoDB storage backend""" +import itertools +import operator + import copy import datetime import uuid @@ -243,69 +246,95 @@ class Connection(pymongo_base.Connection): self.conn.close() def record_metering_data(self, data): - """Write the data to the backend storage system. + # TODO(liusheng): this is a workaround that is because there are + # storage scenario tests which directly invoke this method and pass a + # sample dict with all the storage backends and + # call conn.record_metering_data. May all the Ceilometer + # native storage backends can support batch recording in future, and + # then we need to refactor the scenario tests. + self.record_metering_data_batch([data]) - :param data: a dictionary such as returned by - ceilometer.publisher.utils.meter_message_from_counter + def record_metering_data_batch(self, samples): + """Record the metering data in batch. + + :param samples: a list of samples dict. """ # Record the updated resource metadata - we use $setOnInsert to # unconditionally insert sample timestamps and resource metadata # (in the update case, this must be conditional on the sample not # being out-of-order) - data = copy.deepcopy(data) - data['resource_metadata'] = pymongo_utils.improve_keys( - data.pop('resource_metadata')) - resource = self.db.resource.find_one_and_update( - {'_id': data['resource_id']}, - {'$set': {'project_id': data['project_id'], - 'user_id': data['user_id'], - 'source': data['source'], - }, - '$setOnInsert': {'metadata': data['resource_metadata'], - 'first_sample_timestamp': data['timestamp'], - 'last_sample_timestamp': data['timestamp'], - }, - '$addToSet': {'meter': {'counter_name': data['counter_name'], - 'counter_type': data['counter_type'], - 'counter_unit': data['counter_unit'], - }, - }, - }, - upsert=True, - return_document=pymongo.ReturnDocument.AFTER, - ) - - # only update last sample timestamp if actually later (the usual - # in-order case) - last_sample_timestamp = resource.get('last_sample_timestamp') - if (last_sample_timestamp is None or - last_sample_timestamp <= data['timestamp']): - self.db.resource.update_one( - {'_id': data['resource_id']}, - {'$set': {'metadata': data['resource_metadata'], - 'last_sample_timestamp': data['timestamp']}} + sorted_samples = sorted( + copy.deepcopy(samples), + key=lambda s: (s['resource_id'], s['timestamp'])) + res_grouped_samples = itertools.groupby( + sorted_samples, key=operator.itemgetter('resource_id')) + samples_to_update_resource = [] + for resource_id, g_samples in res_grouped_samples: + g_samples = list(g_samples) + g_samples[-1]['meter'] = [{'counter_name': s['counter_name'], + 'counter_type': s['counter_type'], + 'counter_unit': s['counter_unit'], + } for s in g_samples] + g_samples[-1]['last_sample_timestamp'] = g_samples[-1]['timestamp'] + g_samples[-1]['first_sample_timestamp'] = g_samples[0]['timestamp'] + samples_to_update_resource.append(g_samples[-1]) + for sample in samples_to_update_resource: + sample['resource_metadata'] = pymongo_utils.improve_keys( + sample.pop('resource_metadata')) + resource = self.db.resource.find_one_and_update( + {'_id': sample['resource_id']}, + {'$set': {'project_id': sample['project_id'], + 'user_id': sample['user_id'], + 'source': sample['source'], + }, + '$setOnInsert': { + 'metadata': sample['resource_metadata'], + 'first_sample_timestamp': sample['timestamp'], + 'last_sample_timestamp': sample['timestamp'], + }, + '$addToSet': { + 'meter': {'$each': sample['meter']}, + }, + }, + upsert=True, + return_document=pymongo.ReturnDocument.AFTER, ) - # only update first sample timestamp if actually earlier (the unusual - # out-of-order case) - # NOTE: a null first sample timestamp is not updated as this indicates - # a pre-existing resource document dating from before we started - # recording these timestamps in the resource collection - first_sample_timestamp = resource.get('first_sample_timestamp') - if (first_sample_timestamp is not None and - first_sample_timestamp > data['timestamp']): - self.db.resource.update_one( - {'_id': data['resource_id']}, - {'$set': {'first_sample_timestamp': data['timestamp']}} - ) + # only update last sample timestamp if actually later (the usual + # in-order case) + last_sample_timestamp = resource.get('last_sample_timestamp') + if (last_sample_timestamp is None or + last_sample_timestamp <= sample['last_sample_timestamp']): + self.db.resource.update_one( + {'_id': sample['resource_id']}, + {'$set': {'metadata': sample['resource_metadata'], + 'last_sample_timestamp': + sample['last_sample_timestamp']}} + ) + + # only update first sample timestamp if actually earlier ( + # the unusual out-of-order case) + # NOTE: a null first sample timestamp is not updated as this + # indicates a pre-existing resource document dating from before + # we started recording these timestamps in the resource collection + first_sample_timestamp = resource.get('first_sample_timestamp') + if (first_sample_timestamp is not None and + first_sample_timestamp > sample['first_sample_timestamp']): + self.db.resource.update_one( + {'_id': sample['resource_id']}, + {'$set': {'first_sample_timestamp': + sample['first_sample_timestamp']}} + ) # Record the raw data for the meter. Use a copy so we do not # modify a data structure owned by our caller (the driver adds # a new key '_id'). - record = copy.copy(data) - record['recorded_at'] = timeutils.utcnow() - - self.db.meter.insert_one(record) + record = copy.deepcopy(samples) + for s in record: + s['recorded_at'] = timeutils.utcnow() + s['resource_metadata'] = pymongo_utils.improve_keys( + s.pop('resource_metadata')) + self.db.meter.insert_many(record) def clear_expired_metering_data(self, ttl): """Clear expired data from the backend storage system. diff --git a/ceilometer/tests/functional/storage/test_storage_scenarios.py b/ceilometer/tests/functional/storage/test_storage_scenarios.py index 2f78b100c8..d0b884d664 100644 --- a/ceilometer/tests/functional/storage/test_storage_scenarios.py +++ b/ceilometer/tests/functional/storage/test_storage_scenarios.py @@ -3198,3 +3198,39 @@ class TestRecordUnicodeSamples(DBTestBase): self.assertEqual(expected['counter_name'], actual['counter_name']) self.assertEqual(expected['resource_metadata'], actual['resource_metadata']) + + +@tests_db.run_with('mongodb') +class TestBatchRecordingMetering(tests_db.TestBase): + def test_batch_recording_metering_data(self): + self.sample_dicts = [] + for i in range(1, 10): + s = sample.Sample(name='sample-%s' % i, + type=sample.TYPE_CUMULATIVE, + unit='', + volume=i * 0.1, + user_id='user-id', + project_id='project-id', + resource_id='resource-%s' % str(i % 3), + timestamp=datetime.datetime(2016, 6, 1, 15, i), + resource_metadata={'fake_meta': i}, + source=None) + s_dict = utils.meter_message_from_counter( + s, self.CONF.publisher.telemetry_secret) + self.sample_dicts.append(s_dict) + self.conn.record_metering_data_batch(self.sample_dicts) + results = list(self.conn.query_samples()) + self.assertEqual(len(self.sample_dicts), len(results)) + for sample_item in results: + d = sample_item.as_dict() + del d['recorded_at'] + self.assertIn(d, self.sample_dicts) + + resources = list(self.conn.get_resources()) + self.assertEqual(3, len(resources)) + self.assertEqual('resource-0', resources[0].resource_id) + self.assertEqual({'fake_meta': 9}, resources[0].metadata) + self.assertEqual('resource-2', resources[1].resource_id) + self.assertEqual({'fake_meta': 8}, resources[1].metadata) + self.assertEqual('resource-1', resources[2].resource_id) + self.assertEqual({'fake_meta': 7}, resources[2].metadata) diff --git a/ceilometer/tests/unit/dispatcher/test_db.py b/ceilometer/tests/unit/dispatcher/test_db.py index 2173ad1916..f6747e47a0 100644 --- a/ceilometer/tests/unit/dispatcher/test_db.py +++ b/ceilometer/tests/unit/dispatcher/test_db.py @@ -91,7 +91,7 @@ class TestDispatcherDB(base.BaseTestCase): called = False - def record_metering_data(self, data): + def record_metering_data_batch(self, data): self.called = True self.dispatcher._meter_conn = ErrorConnection() diff --git a/releasenotes/notes/support-meter-batch-recording-mongo-6c2bdf4fbb9764eb.yaml b/releasenotes/notes/support-meter-batch-recording-mongo-6c2bdf4fbb9764eb.yaml new file mode 100644 index 0000000000..22961d0319 --- /dev/null +++ b/releasenotes/notes/support-meter-batch-recording-mongo-6c2bdf4fbb9764eb.yaml @@ -0,0 +1,6 @@ +--- +features: + - Add support of batch recording metering data to mongodb backend, since + the pymongo support *insert_many* interface which can be used to batch + record items, in "big-data" scenarios, this change can improve the + performance of metering data recording.