Convert timestamps to datetime objects before storing

Parse the incoming timestamp strings into datetime objects
before passing them off to the storage backend.

Change-Id: I42d825c9633fa2c120409d79883756d23b045a60
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2012-06-29 15:02:12 -04:00
parent b345efa0be
commit 022ecffe56
5 changed files with 143 additions and 6 deletions

View File

@ -24,9 +24,10 @@ from ceilometer import log
from ceilometer import meter from ceilometer import meter
from ceilometer import publish from ceilometer import publish
from ceilometer import rpc from ceilometer import rpc
from ceilometer.collector import dispatcher
from ceilometer import storage from ceilometer import storage
from ceilometer.collector import dispatcher
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import timeutils
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
# FIXME(dhellmann): There must be another way to do this. # FIXME(dhellmann): There must be another way to do this.
@ -88,15 +89,23 @@ class CollectorManager(manager.Manager):
cast from an agent. cast from an agent.
""" """
#LOG.info('metering data: %r', data) #LOG.info('metering data: %r', data)
LOG.info('metering data %s for %s: %s', LOG.info('metering data %s for %s @ %s: %s',
data['counter_name'], data['counter_name'],
data['resource_id'], data['resource_id'],
data.get('timestamp', 'NO TIMESTAMP'),
data['counter_volume']) data['counter_volume'])
if not meter.verify_signature(data): if not meter.verify_signature(data):
LOG.warning('message signature invalid, discarding message: %r', LOG.warning('message signature invalid, discarding message: %r',
data) data)
else: else:
try: try:
# Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting
# that value to something they can store.
if 'timestamp' in data:
data['timestamp'] = timeutils.parse_isotime(
data['timestamp'],
)
self.storage_conn.record_metering_data(data) self.storage_conn.record_metering_data(data)
except Exception as err: except Exception as err:
LOG.error('Failed to record metering data: %s', err) LOG.error('Failed to record metering data: %s', err)

View File

@ -16,8 +16,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
from lxml import etree from lxml import etree
from nova import flags from nova import flags
@ -27,6 +25,7 @@ from ceilometer import log
from ceilometer import counter from ceilometer import counter
from ceilometer import plugin from ceilometer import plugin
from ceilometer.compute import instance as compute_instance from ceilometer.compute import instance as compute_instance
from ceilometer.openstack.common import timeutils
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@ -42,7 +41,7 @@ def make_counter_from_instance(instance, name, type, volume):
user_id=instance.user_id, user_id=instance.user_id,
project_id=instance.project_id, project_id=instance.project_id,
resource_id=instance.uuid, resource_id=instance.uuid,
timestamp=datetime.datetime.utcnow().isoformat(), timestamp=timeutils.isotime(),
duration=None, duration=None,
resource_metadata=compute_instance.get_metadata_from_dbobject( resource_metadata=compute_instance.get_metadata_from_dbobject(
instance), instance),

View File

@ -0,0 +1,109 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import time
import iso8601
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
def isotime(at=None):
"""Stringify time in ISO 8601 format"""
if not at:
at = utcnow()
str = at.strftime(TIME_FORMAT)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
str += ('Z' if tz == 'UTC' else tz)
return str
def parse_isotime(timestr):
"""Parse time from ISO 8601 format"""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
except TypeError as e:
raise ValueError(e.message)
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC"""
offset = timestamp.utcoffset()
return timestamp.replace(tzinfo=None) - offset if offset else timestamp
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
return utcnow() - before > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return calendar.timegm(utcnow().timetuple())
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
return utcnow.override_time
return datetime.datetime.utcnow()
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time."""
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overriden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overriden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None

View File

@ -1,3 +1,3 @@
[DEFAULT] [DEFAULT]
modules=cfg,iniparser,rpc,importutils,excutils,local,jsonutils,gettextutils modules=cfg,iniparser,rpc,importutils,excutils,local,jsonutils,gettextutils,timeutils
base=ceilometer base=ceilometer

View File

@ -69,3 +69,23 @@ class TestCollectorManager(test.TestCase):
assert not self.mgr.storage_conn.called, \ assert not self.mgr.storage_conn.called, \
'Should not have called the storage connection' 'Should not have called the storage connection'
def test_timestamp_conversion(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
'timestamp': '2012-07-02T13:53:40Z',
}
msg['message_signature'] = meter.compute_signature(msg)
expected = {}
expected.update(msg)
expected['timestamp'] = datetime.datetime(2012, 7, 2, 13, 53, 40)
self.mgr.storage_conn = self.mox.CreateMock(base.Connection)
self.mgr.storage_conn.record_metering_data(expected)
self.mox.ReplayAll()
self.mgr.record_metering_data(self.ctx, msg)
self.mox.VerifyAll()