diff --git a/ceilometer/storage/impl_db2.py b/ceilometer/storage/impl_db2.py index 631cd1d85..c10c5c6f2 100644 --- a/ceilometer/storage/impl_db2.py +++ b/ceilometer/storage/impl_db2.py @@ -345,6 +345,7 @@ class Connection(base.Connection): # modify a data structure owned by our caller (the driver adds # a new key '_id'). record = copy.copy(data) + record['recorded_at'] = timeutils.utcnow() # Make sure that the data does have field _id which db2 wont add # automatically. if record.get('_id') is None: diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 0a8e64d64..1e41232db 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -230,8 +230,11 @@ class Connection(base.Connection): # alphabetically. row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) + recorded_at = timeutils.utcnow() + # Convert timestamp to string as json.dumps won't ts = timeutils.strtime(data['timestamp']) + recorded_at_ts = timeutils.strtime(recorded_at) record = {'f:timestamp': ts, 'f:counter_name': data['counter_name'], @@ -246,6 +249,7 @@ class Connection(base.Connection): 'f:message_id': data['message_id'], 'f:resource_id': data['resource_id'], 'f:source': data['source'], + 'f:recorded_at': recorded_at, # add in reversed_ts here for time range scan 'f:rts': str(rts) } @@ -254,6 +258,7 @@ class Connection(base.Connection): # Don't want to be changing the original data object. data = copy.copy(data) data['timestamp'] = ts + data['recorded_at'] = recorded_at_ts # Save original meter. record['f:message'] = json.dumps(data) meter_table.put(row, record) @@ -419,17 +424,20 @@ class Connection(base.Connection): user_id=data['f:user_id'], ) + @staticmethod + def _make_sample(data): + """Transform HBase fields to Sample model.""" + data = json.loads(data['f:message']) + data['timestamp'] = timeutils.parse_strtime(data['timestamp']) + data['recorded_at'] = timeutils.parse_strtime(data['recorded_at']) + return models.Sample(**data) + def get_samples(self, sample_filter, limit=None): """Return an iterable of models.Sample instances. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ - def make_sample(data): - """Transform HBase fields to Sample model.""" - data = json.loads(data['f:message']) - data['timestamp'] = timeutils.parse_strtime(data['timestamp']) - return models.Sample(**data) meter_table = self.conn.table(self.METER_TABLE) @@ -467,11 +475,11 @@ class Connection(base.Connection): else: if limit: limit -= 1 - yield make_sample(meter) + yield self._make_sample(meter) else: if limit: limit -= 1 - yield make_sample(meter) + yield self._make_sample(meter) @staticmethod def _update_meter_stats(stat, meter): diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 62fff58b4..05d4633f5 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -35,6 +35,7 @@ from oslo.config import cfg from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log +from ceilometer.openstack.common import timeutils from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models @@ -470,6 +471,7 @@ class Connection(base.Connection): # modify a data structure owned by our caller (the driver adds # a new key '_id'). record = copy.copy(data) + record['recorded_at'] = timeutils.utcnow() self.db.meter.insert(record) def clear_expired_metering_data(self, ttl): diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 47bef79da..eda5af965 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -560,6 +560,7 @@ class Connection(base.Connection): project_id=s.project_id, resource_id=s.resource_id, timestamp=s.timestamp, + recorded_at=s.recorded_at, resource_metadata=s.resource_metadata, message_id=s.message_id, message_signature=s.message_signature, diff --git a/ceilometer/storage/models.py b/ceilometer/storage/models.py index ea6e935a4..dbcee3367 100644 --- a/ceilometer/storage/models.py +++ b/ceilometer/storage/models.py @@ -205,6 +205,7 @@ class Sample(Model): timestamp, resource_metadata, message_id, message_signature, + recorded_at, ): """Create a new sample. @@ -219,6 +220,7 @@ class Sample(Model): :param timestamp: the time of the measurement :param resource_metadata: extra details about the resource :param message_id: a message identifier + :param recorded_at: sample record timestamp :param message_signature: a hash created from the rest of the message data """ @@ -234,7 +236,8 @@ class Sample(Model): timestamp=timestamp, resource_metadata=resource_metadata, message_id=message_id, - message_signature=message_signature) + message_signature=message_signature, + recorded_at=recorded_at) class Statistics(Model): diff --git a/ceilometer/storage/sqlalchemy/migrate_repo/versions/029_sample_recorded_at.py b/ceilometer/storage/sqlalchemy/migrate_repo/versions/029_sample_recorded_at.py new file mode 100644 index 000000000..72db7ba7b --- /dev/null +++ b/ceilometer/storage/sqlalchemy/migrate_repo/versions/029_sample_recorded_at.py @@ -0,0 +1,32 @@ +# -*- encoding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy + +from ceilometer.openstack.common import timeutils +from ceilometer.storage.sqlalchemy import models + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + meter = sqlalchemy.Table('meter', meta, autoload=True) + c = sqlalchemy.Column('recorded_at', models.PreciseTimestamp(), + default=timeutils.utcnow) + meter.create_column(c) + + +def downgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + meter = sqlalchemy.Table('meter', meta, autoload=True) + meter.drop_column('recorded_at') diff --git a/ceilometer/storage/sqlalchemy/models.py b/ceilometer/storage/sqlalchemy/models.py index a086c8b6f..928834fc7 100644 --- a/ceilometer/storage/sqlalchemy/models.py +++ b/ceilometer/storage/sqlalchemy/models.py @@ -24,12 +24,12 @@ import six.moves.urllib.parse as urlparse from oslo.config import cfg from sqlalchemy import Column, Integer, String, Table, ForeignKey, \ Index, UniqueConstraint, BigInteger -from sqlalchemy import Float, Boolean, Text +from sqlalchemy import Float, Boolean, Text, DateTime from sqlalchemy.dialects.mysql import DECIMAL from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import backref from sqlalchemy.orm import relationship -from sqlalchemy.types import TypeDecorator, DATETIME +from sqlalchemy.types import TypeDecorator from ceilometer.openstack.common import timeutils from ceilometer.storage import models as api_models @@ -71,14 +71,14 @@ class JSONEncodedDict(TypeDecorator): class PreciseTimestamp(TypeDecorator): """Represents a timestamp precise to the microsecond.""" - impl = DATETIME + impl = DateTime def load_dialect_impl(self, dialect): if dialect.name == 'mysql': return dialect.type_descriptor(DECIMAL(precision=20, scale=6, asdecimal=True)) - return dialect.type_descriptor(DATETIME()) + return self.impl def process_bind_param(self, value, dialect): if value is None: @@ -209,6 +209,7 @@ class Meter(Base): counter_unit = Column(String(255)) counter_volume = Column(Float(53)) timestamp = Column(PreciseTimestamp(), default=timeutils.utcnow) + recorded_at = Column(PreciseTimestamp(), default=timeutils.utcnow) message_signature = Column(String(1000)) message_id = Column(String(1000)) sources = relationship("Source", secondary=lambda: sourceassoc) diff --git a/ceilometer/tests/storage/sqlalchemy/test_models.py b/ceilometer/tests/storage/sqlalchemy/test_models.py index 73e2957e8..c77f83aaa 100644 --- a/ceilometer/tests/storage/sqlalchemy/test_models.py +++ b/ceilometer/tests/storage/sqlalchemy/test_models.py @@ -19,8 +19,9 @@ import datetime import mock +import sqlalchemy from sqlalchemy.dialects.mysql import DECIMAL -from sqlalchemy.types import DATETIME, NUMERIC +from sqlalchemy.types import NUMERIC from ceilometer.openstack.common import test from ceilometer.storage.sqlalchemy import models @@ -34,8 +35,6 @@ class PreciseTimestampTest(test.BaseTestCase): def _type_descriptor_mock(desc): if type(desc) == DECIMAL: return NUMERIC(precision=desc.precision, scale=desc.scale) - if type(desc) == DATETIME: - return DATETIME() dialect = mock.MagicMock() dialect.name = name dialect.type_descriptor = _type_descriptor_mock @@ -57,7 +56,7 @@ class PreciseTimestampTest(test.BaseTestCase): def test_load_dialect_impl_postgres(self): result = self._type.load_dialect_impl(self._postgres_dialect) - self.assertEqual(type(result), DATETIME) + self.assertEqual(type(result), sqlalchemy.DateTime) def test_process_bind_param_store_decimal_mysql(self): expected = utils.dt_to_decimal(self._date) diff --git a/ceilometer/tests/storage/test_models.py b/ceilometer/tests/storage/test_models.py index 832f58a1a..2aea7561e 100644 --- a/ceilometer/tests/storage/test_models.py +++ b/ceilometer/tests/storage/test_models.py @@ -62,7 +62,7 @@ class ModelTest(test.BaseTestCase): "counter_unit", "counter_volume", "user_id", "project_id", "resource_id", "timestamp", "resource_metadata", "message_id", - "message_signature"] + "message_signature", "recorded_at"] self.assertEqual(set(sample_fields), set(models.Sample.get_field_names())) diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py index 45b09ca2f..d7a99613a 100644 --- a/ceilometer/tests/storage/test_storage_scenarios.py +++ b/ceilometer/tests/storage/test_storage_scenarios.py @@ -3,7 +3,8 @@ # Copyright © 2013 Intel Corp. # # Author: Lianhao Lu -# Author: Shane Wang +# Shane Wang +# Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -59,12 +60,10 @@ class DBTestBase(tests_db.TestBase): def setUp(self): super(DBTestBase, self).setUp() + timeutils.set_time_override( + datetime.datetime(2015, 7, 2, 10, 39)) self.prepare_data() - def tearDown(self): - timeutils.utcnow.override_time = None - super(DBTestBase, self).tearDown() - def prepare_data(self): original_timestamps = [(2012, 7, 2, 10, 40), (2012, 7, 2, 10, 41), (2012, 7, 2, 10, 41), (2012, 7, 2, 10, 42), @@ -484,6 +483,9 @@ class RawSampleTest(DBTestBase, f = storage.SampleFilter() results = list(self.conn.get_samples(f, limit=3)) self.assertEqual(len(results), 3) + for result in results: + self.assertTimestampEqual(result.recorded_at, + timeutils.utcnow()) def test_get_samples_in_default_order(self): f = storage.SampleFilter() @@ -498,7 +500,11 @@ class RawSampleTest(DBTestBase, results = list(self.conn.get_samples(f)) self.assertEqual(len(results), 3) for meter in results: - self.assertIn(meter.as_dict(), self.msgs[:3]) + d = meter.as_dict() + self.assertTimestampEqual(d['recorded_at'], + timeutils.utcnow()) + del d['recorded_at'] + self.assertIn(d, self.msgs[:3]) def test_get_samples_by_user_limit(self): f = storage.SampleFilter(user='user-id') @@ -513,25 +519,35 @@ class RawSampleTest(DBTestBase, def test_get_samples_by_project(self): f = storage.SampleFilter(project='project-id') results = list(self.conn.get_samples(f)) - assert results + self.assertIsNotNone(results) for meter in results: - self.assertIn(meter.as_dict(), self.msgs[:4]) + d = meter.as_dict() + self.assertTimestampEqual(d['recorded_at'], + timeutils.utcnow()) + del d['recorded_at'] + self.assertIn(d, self.msgs[:4]) def test_get_samples_by_resource(self): f = storage.SampleFilter(user='user-id', resource='resource-id') results = list(self.conn.get_samples(f)) assert results meter = results[1] - assert meter is not None - self.assertEqual(meter.as_dict(), self.msgs[0]) + d = meter.as_dict() + self.assertEqual(d['recorded_at'], timeutils.utcnow()) + del d['recorded_at'] + self.assertEqual(d, self.msgs[0]) def test_get_samples_by_metaquery(self): q = {'metadata.display_name': 'test-server'} f = storage.SampleFilter(metaquery=q) results = list(self.conn.get_samples(f)) - assert results + self.assertIsNotNone(results) for meter in results: - self.assertIn(meter.as_dict(), self.msgs) + d = meter.as_dict() + self.assertTimestampEqual(d['recorded_at'], + timeutils.utcnow()) + del d['recorded_at'] + self.assertIn(d, self.msgs) def test_get_samples_by_start_time(self): timestamp = datetime.datetime(2012, 7, 2, 10, 41) @@ -744,7 +760,9 @@ class ComplexSampleQueryTest(DBTestBase, results = list(self.conn.query_samples()) self.assertEqual(len(results), len(self.msgs)) for sample in results: - self.assertIn(sample.as_dict(), self.msgs) + d = sample.as_dict() + del d['recorded_at'] + self.assertIn(d, self.msgs) def test_no_filter_with_zero_limit(self): limit = 0