storage: store recording timestamp
This patch adds a record_at field in the sample that are stored that indicates when the timestamp has been stored in the database. Change-Id: Ia0ff8bd07fd811fe8d3050d30971a05a277798d0 Blueprint: storage-sample-timestamp
This commit is contained in:
@@ -345,6 +345,7 @@ class Connection(base.Connection):
|
||||
# modify a data structure owned by our caller (the driver adds
|
||||
# a new key '_id').
|
||||
record = copy.copy(data)
|
||||
record['recorded_at'] = timeutils.utcnow()
|
||||
# Make sure that the data does have field _id which db2 wont add
|
||||
# automatically.
|
||||
if record.get('_id') is None:
|
||||
|
||||
@@ -230,8 +230,11 @@ class Connection(base.Connection):
|
||||
# alphabetically.
|
||||
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
|
||||
|
||||
recorded_at = timeutils.utcnow()
|
||||
|
||||
# Convert timestamp to string as json.dumps won't
|
||||
ts = timeutils.strtime(data['timestamp'])
|
||||
recorded_at_ts = timeutils.strtime(recorded_at)
|
||||
|
||||
record = {'f:timestamp': ts,
|
||||
'f:counter_name': data['counter_name'],
|
||||
@@ -246,6 +249,7 @@ class Connection(base.Connection):
|
||||
'f:message_id': data['message_id'],
|
||||
'f:resource_id': data['resource_id'],
|
||||
'f:source': data['source'],
|
||||
'f:recorded_at': recorded_at,
|
||||
# add in reversed_ts here for time range scan
|
||||
'f:rts': str(rts)
|
||||
}
|
||||
@@ -254,6 +258,7 @@ class Connection(base.Connection):
|
||||
# Don't want to be changing the original data object.
|
||||
data = copy.copy(data)
|
||||
data['timestamp'] = ts
|
||||
data['recorded_at'] = recorded_at_ts
|
||||
# Save original meter.
|
||||
record['f:message'] = json.dumps(data)
|
||||
meter_table.put(row, record)
|
||||
@@ -419,17 +424,20 @@ class Connection(base.Connection):
|
||||
user_id=data['f:user_id'],
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _make_sample(data):
|
||||
"""Transform HBase fields to Sample model."""
|
||||
data = json.loads(data['f:message'])
|
||||
data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
|
||||
data['recorded_at'] = timeutils.parse_strtime(data['recorded_at'])
|
||||
return models.Sample(**data)
|
||||
|
||||
def get_samples(self, sample_filter, limit=None):
|
||||
"""Return an iterable of models.Sample instances.
|
||||
|
||||
:param sample_filter: Filter.
|
||||
:param limit: Maximum number of results to return.
|
||||
"""
|
||||
def make_sample(data):
|
||||
"""Transform HBase fields to Sample model."""
|
||||
data = json.loads(data['f:message'])
|
||||
data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
|
||||
return models.Sample(**data)
|
||||
|
||||
meter_table = self.conn.table(self.METER_TABLE)
|
||||
|
||||
@@ -467,11 +475,11 @@ class Connection(base.Connection):
|
||||
else:
|
||||
if limit:
|
||||
limit -= 1
|
||||
yield make_sample(meter)
|
||||
yield self._make_sample(meter)
|
||||
else:
|
||||
if limit:
|
||||
limit -= 1
|
||||
yield make_sample(meter)
|
||||
yield self._make_sample(meter)
|
||||
|
||||
@staticmethod
|
||||
def _update_meter_stats(stat, meter):
|
||||
|
||||
@@ -35,6 +35,7 @@ from oslo.config import cfg
|
||||
|
||||
from ceilometer.openstack.common.gettextutils import _ # noqa
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer import storage
|
||||
from ceilometer.storage import base
|
||||
from ceilometer.storage import models
|
||||
@@ -470,6 +471,7 @@ class Connection(base.Connection):
|
||||
# modify a data structure owned by our caller (the driver adds
|
||||
# a new key '_id').
|
||||
record = copy.copy(data)
|
||||
record['recorded_at'] = timeutils.utcnow()
|
||||
self.db.meter.insert(record)
|
||||
|
||||
def clear_expired_metering_data(self, ttl):
|
||||
|
||||
@@ -560,6 +560,7 @@ class Connection(base.Connection):
|
||||
project_id=s.project_id,
|
||||
resource_id=s.resource_id,
|
||||
timestamp=s.timestamp,
|
||||
recorded_at=s.recorded_at,
|
||||
resource_metadata=s.resource_metadata,
|
||||
message_id=s.message_id,
|
||||
message_signature=s.message_signature,
|
||||
|
||||
@@ -205,6 +205,7 @@ class Sample(Model):
|
||||
timestamp, resource_metadata,
|
||||
message_id,
|
||||
message_signature,
|
||||
recorded_at,
|
||||
):
|
||||
"""Create a new sample.
|
||||
|
||||
@@ -219,6 +220,7 @@ class Sample(Model):
|
||||
:param timestamp: the time of the measurement
|
||||
:param resource_metadata: extra details about the resource
|
||||
:param message_id: a message identifier
|
||||
:param recorded_at: sample record timestamp
|
||||
:param message_signature: a hash created from the rest of the
|
||||
message data
|
||||
"""
|
||||
@@ -234,7 +236,8 @@ class Sample(Model):
|
||||
timestamp=timestamp,
|
||||
resource_metadata=resource_metadata,
|
||||
message_id=message_id,
|
||||
message_signature=message_signature)
|
||||
message_signature=message_signature,
|
||||
recorded_at=recorded_at)
|
||||
|
||||
|
||||
class Statistics(Model):
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer.storage.sqlalchemy import models
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
meta = sqlalchemy.MetaData(bind=migrate_engine)
|
||||
meter = sqlalchemy.Table('meter', meta, autoload=True)
|
||||
c = sqlalchemy.Column('recorded_at', models.PreciseTimestamp(),
|
||||
default=timeutils.utcnow)
|
||||
meter.create_column(c)
|
||||
|
||||
|
||||
def downgrade(migrate_engine):
|
||||
meta = sqlalchemy.MetaData(bind=migrate_engine)
|
||||
meter = sqlalchemy.Table('meter', meta, autoload=True)
|
||||
meter.drop_column('recorded_at')
|
||||
@@ -24,12 +24,12 @@ import six.moves.urllib.parse as urlparse
|
||||
from oslo.config import cfg
|
||||
from sqlalchemy import Column, Integer, String, Table, ForeignKey, \
|
||||
Index, UniqueConstraint, BigInteger
|
||||
from sqlalchemy import Float, Boolean, Text
|
||||
from sqlalchemy import Float, Boolean, Text, DateTime
|
||||
from sqlalchemy.dialects.mysql import DECIMAL
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import backref
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.types import TypeDecorator, DATETIME
|
||||
from sqlalchemy.types import TypeDecorator
|
||||
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer.storage import models as api_models
|
||||
@@ -71,14 +71,14 @@ class JSONEncodedDict(TypeDecorator):
|
||||
class PreciseTimestamp(TypeDecorator):
|
||||
"""Represents a timestamp precise to the microsecond."""
|
||||
|
||||
impl = DATETIME
|
||||
impl = DateTime
|
||||
|
||||
def load_dialect_impl(self, dialect):
|
||||
if dialect.name == 'mysql':
|
||||
return dialect.type_descriptor(DECIMAL(precision=20,
|
||||
scale=6,
|
||||
asdecimal=True))
|
||||
return dialect.type_descriptor(DATETIME())
|
||||
return self.impl
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
if value is None:
|
||||
@@ -209,6 +209,7 @@ class Meter(Base):
|
||||
counter_unit = Column(String(255))
|
||||
counter_volume = Column(Float(53))
|
||||
timestamp = Column(PreciseTimestamp(), default=timeutils.utcnow)
|
||||
recorded_at = Column(PreciseTimestamp(), default=timeutils.utcnow)
|
||||
message_signature = Column(String(1000))
|
||||
message_id = Column(String(1000))
|
||||
sources = relationship("Source", secondary=lambda: sourceassoc)
|
||||
|
||||
@@ -19,8 +19,9 @@
|
||||
import datetime
|
||||
|
||||
import mock
|
||||
import sqlalchemy
|
||||
from sqlalchemy.dialects.mysql import DECIMAL
|
||||
from sqlalchemy.types import DATETIME, NUMERIC
|
||||
from sqlalchemy.types import NUMERIC
|
||||
|
||||
from ceilometer.openstack.common import test
|
||||
from ceilometer.storage.sqlalchemy import models
|
||||
@@ -34,8 +35,6 @@ class PreciseTimestampTest(test.BaseTestCase):
|
||||
def _type_descriptor_mock(desc):
|
||||
if type(desc) == DECIMAL:
|
||||
return NUMERIC(precision=desc.precision, scale=desc.scale)
|
||||
if type(desc) == DATETIME:
|
||||
return DATETIME()
|
||||
dialect = mock.MagicMock()
|
||||
dialect.name = name
|
||||
dialect.type_descriptor = _type_descriptor_mock
|
||||
@@ -57,7 +56,7 @@ class PreciseTimestampTest(test.BaseTestCase):
|
||||
|
||||
def test_load_dialect_impl_postgres(self):
|
||||
result = self._type.load_dialect_impl(self._postgres_dialect)
|
||||
self.assertEqual(type(result), DATETIME)
|
||||
self.assertEqual(type(result), sqlalchemy.DateTime)
|
||||
|
||||
def test_process_bind_param_store_decimal_mysql(self):
|
||||
expected = utils.dt_to_decimal(self._date)
|
||||
|
||||
@@ -62,7 +62,7 @@ class ModelTest(test.BaseTestCase):
|
||||
"counter_unit", "counter_volume", "user_id",
|
||||
"project_id", "resource_id", "timestamp",
|
||||
"resource_metadata", "message_id",
|
||||
"message_signature"]
|
||||
"message_signature", "recorded_at"]
|
||||
|
||||
self.assertEqual(set(sample_fields),
|
||||
set(models.Sample.get_field_names()))
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
# Copyright © 2013 Intel Corp.
|
||||
#
|
||||
# Author: Lianhao Lu <lianhao.lu@intel.com>
|
||||
# Author: Shane Wang <shane.wang@intel.com>
|
||||
# Shane Wang <shane.wang@intel.com>
|
||||
# Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@@ -59,12 +60,10 @@ class DBTestBase(tests_db.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(DBTestBase, self).setUp()
|
||||
timeutils.set_time_override(
|
||||
datetime.datetime(2015, 7, 2, 10, 39))
|
||||
self.prepare_data()
|
||||
|
||||
def tearDown(self):
|
||||
timeutils.utcnow.override_time = None
|
||||
super(DBTestBase, self).tearDown()
|
||||
|
||||
def prepare_data(self):
|
||||
original_timestamps = [(2012, 7, 2, 10, 40), (2012, 7, 2, 10, 41),
|
||||
(2012, 7, 2, 10, 41), (2012, 7, 2, 10, 42),
|
||||
@@ -484,6 +483,9 @@ class RawSampleTest(DBTestBase,
|
||||
f = storage.SampleFilter()
|
||||
results = list(self.conn.get_samples(f, limit=3))
|
||||
self.assertEqual(len(results), 3)
|
||||
for result in results:
|
||||
self.assertTimestampEqual(result.recorded_at,
|
||||
timeutils.utcnow())
|
||||
|
||||
def test_get_samples_in_default_order(self):
|
||||
f = storage.SampleFilter()
|
||||
@@ -498,7 +500,11 @@ class RawSampleTest(DBTestBase,
|
||||
results = list(self.conn.get_samples(f))
|
||||
self.assertEqual(len(results), 3)
|
||||
for meter in results:
|
||||
self.assertIn(meter.as_dict(), self.msgs[:3])
|
||||
d = meter.as_dict()
|
||||
self.assertTimestampEqual(d['recorded_at'],
|
||||
timeutils.utcnow())
|
||||
del d['recorded_at']
|
||||
self.assertIn(d, self.msgs[:3])
|
||||
|
||||
def test_get_samples_by_user_limit(self):
|
||||
f = storage.SampleFilter(user='user-id')
|
||||
@@ -513,25 +519,35 @@ class RawSampleTest(DBTestBase,
|
||||
def test_get_samples_by_project(self):
|
||||
f = storage.SampleFilter(project='project-id')
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert results
|
||||
self.assertIsNotNone(results)
|
||||
for meter in results:
|
||||
self.assertIn(meter.as_dict(), self.msgs[:4])
|
||||
d = meter.as_dict()
|
||||
self.assertTimestampEqual(d['recorded_at'],
|
||||
timeutils.utcnow())
|
||||
del d['recorded_at']
|
||||
self.assertIn(d, self.msgs[:4])
|
||||
|
||||
def test_get_samples_by_resource(self):
|
||||
f = storage.SampleFilter(user='user-id', resource='resource-id')
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert results
|
||||
meter = results[1]
|
||||
assert meter is not None
|
||||
self.assertEqual(meter.as_dict(), self.msgs[0])
|
||||
d = meter.as_dict()
|
||||
self.assertEqual(d['recorded_at'], timeutils.utcnow())
|
||||
del d['recorded_at']
|
||||
self.assertEqual(d, self.msgs[0])
|
||||
|
||||
def test_get_samples_by_metaquery(self):
|
||||
q = {'metadata.display_name': 'test-server'}
|
||||
f = storage.SampleFilter(metaquery=q)
|
||||
results = list(self.conn.get_samples(f))
|
||||
assert results
|
||||
self.assertIsNotNone(results)
|
||||
for meter in results:
|
||||
self.assertIn(meter.as_dict(), self.msgs)
|
||||
d = meter.as_dict()
|
||||
self.assertTimestampEqual(d['recorded_at'],
|
||||
timeutils.utcnow())
|
||||
del d['recorded_at']
|
||||
self.assertIn(d, self.msgs)
|
||||
|
||||
def test_get_samples_by_start_time(self):
|
||||
timestamp = datetime.datetime(2012, 7, 2, 10, 41)
|
||||
@@ -744,7 +760,9 @@ class ComplexSampleQueryTest(DBTestBase,
|
||||
results = list(self.conn.query_samples())
|
||||
self.assertEqual(len(results), len(self.msgs))
|
||||
for sample in results:
|
||||
self.assertIn(sample.as_dict(), self.msgs)
|
||||
d = sample.as_dict()
|
||||
del d['recorded_at']
|
||||
self.assertIn(d, self.msgs)
|
||||
|
||||
def test_no_filter_with_zero_limit(self):
|
||||
limit = 0
|
||||
|
||||
Reference in New Issue
Block a user