Implement duration calculation API

Add the API for calculating the duration for a series of
events from a given meter for a given resource.

Replace the old duration calculation in the storage engine API with
get_event_interval().

Change-Id: I54952e760fc5e108fa25d71b601b7ef2a4937e9e
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2012-08-31 10:15:52 -04:00
parent 1af59e12ad
commit d25a3147c1
7 changed files with 412 additions and 29 deletions

View File

@ -52,7 +52,7 @@
# #
# [ ] /projects/<project>/meters/<meter>/duration -- total time for selected # [ ] /projects/<project>/meters/<meter>/duration -- total time for selected
# meter # meter
# [ ] /resources/<resource>/meters/<meter>/duration -- total time for selected # [x] /resources/<resource>/meters/<meter>/duration -- total time for selected
# meter # meter
# [ ] /sources/<source>/meters/<meter>/duration -- total time for selected # [ ] /sources/<source>/meters/<meter>/duration -- total time for selected
# meter # meter
@ -67,6 +67,8 @@
# [ ] /users/<user>/meters/<meter>/volume -- total or max volume for selected # [ ] /users/<user>/meters/<meter>/volume -- total or max volume for selected
# meter # meter
import datetime
import flask import flask
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
@ -311,3 +313,86 @@ def list_events_by_user(user, meter):
return _list_events(user=user, return _list_events(user=user,
meter=meter, meter=meter,
) )
## APIs for working with meter calculations.
@blueprint.route('/resources/<resource>/meters/<meter>/duration')
def compute_duration_by_resource(resource, meter):
"""Return the earliest timestamp, last timestamp,
and duration for the resource and meter.
:param resource: The ID of the resource.
:param meter: The name of the meter.
:param start_timestamp: ISO-formatted string of the
earliest timestamp to return.
:param end_timestamp: ISO-formatted string of the
latest timestamp to return.
:param search_offset: Number of minutes before
and after start and end timestamps to query.
"""
# Determine the desired range, if any, from the
# GET arguments. Set up the query range using
# the specified offset.
# [query_start ... start_timestamp ... end_timestamp ... query_end]
search_offset = int(flask.request.args.get('search_offset', 0))
start_timestamp = flask.request.args.get('start_timestamp')
if start_timestamp:
start_timestamp = timeutils.parse_isotime(start_timestamp)
start_timestamp = start_timestamp.replace(tzinfo=None)
query_start = (start_timestamp -
datetime.timedelta(minutes=search_offset))
else:
query_start = None
end_timestamp = flask.request.args.get('end_timestamp')
if end_timestamp:
end_timestamp = timeutils.parse_isotime(end_timestamp)
end_timestamp = end_timestamp.replace(tzinfo=None)
query_end = end_timestamp + datetime.timedelta(minutes=search_offset)
else:
query_end = None
# Query the database for the interval of timestamps
# within the desired range.
f = storage.EventFilter(meter=meter,
resource=resource,
start=query_start,
end=query_end,
)
min_ts, max_ts = flask.request.storage_conn.get_event_interval(f)
# "Clamp" the timestamps we return to the original time
# range, excluding the offset.
LOG.debug('start_timestamp %s, end_timestamp %s, min_ts %s, max_ts %s',
start_timestamp, end_timestamp, min_ts, max_ts)
if start_timestamp and min_ts and min_ts < start_timestamp:
min_ts = start_timestamp
LOG.debug('clamping min timestamp to range')
if end_timestamp and max_ts and max_ts > end_timestamp:
max_ts = end_timestamp
LOG.debug('clamping max timestamp to range')
# If we got valid timestamps back, compute a duration in minutes.
#
# If the min > max after clamping then we know the
# timestamps on the events fell outside of the time
# range we care about for the query, so treat them as
# "invalid."
#
# If the timestamps are invalid, return None as a
# sentinal indicating that there is something "funny"
# about the range.
if min_ts and max_ts and (min_ts <= max_ts):
# Can't use timedelta.total_seconds() because
# it is not available in Python 2.6.
diff = max_ts - min_ts
duration = (diff.seconds + (diff.days * 24 * 60 ** 2)) / 60
else:
min_ts = max_ts = duration = None
return flask.jsonify(start_timestamp=min_ts,
end_timestamp=max_ts,
duration=duration,
)

View File

@ -125,9 +125,9 @@ class Connection(object):
""" """
@abc.abstractmethod @abc.abstractmethod
def get_duration_sum(self, event_filter): def get_event_interval(self, event_filter):
"""Return the sum of time for the events described by the """Return the min and max timestamps from events,
query parameters. using the event_filter to limit the events seen.
The filter must have a meter value set. ( datetime.datetime(), datetime.datetime() )
""" """

View File

@ -95,7 +95,7 @@ class Connection(base.Connection):
described by the query parameters. described by the query parameters.
""" """
def get_duration_sum(self, event_filter): def get_event_interval(self, event_filter):
"""Return the sum of time for the events described by the """Return the min and max timestamp for events
query parameters. matching the event_filter.
""" """

View File

@ -130,14 +130,6 @@ class Connection(base.Connection):
} }
""") """)
# JavaScript function for doing map-reduce to get a counter
# duration total.
MAP_COUNTER_DURATION = bson.code.Code("""
function() {
emit(this.resource_id, this.counter_duration);
}
""")
# JavaScript function for doing map-reduce to get a maximum value # JavaScript function for doing map-reduce to get a maximum value
# from a range. (from # from a range. (from
# http://cookbook.mongodb.org/patterns/finding_max_and_min/) # http://cookbook.mongodb.org/patterns/finding_max_and_min/)
@ -158,6 +150,28 @@ class Connection(base.Connection):
} }
""") """)
# MAP_TIMESTAMP and REDUCE_MIN_MAX are based on the recipe
# http://cookbook.mongodb.org/patterns/finding_max_and_min_values_for_a_key
MAP_TIMESTAMP = bson.code.Code("""
function () {
emit('timestamp', { min : this.timestamp,
max : this.timestamp } )
}
""")
REDUCE_MIN_MAX = bson.code.Code("""
function (key, values) {
var res = values[0];
for ( var i=1; i<values.length; i++ ) {
if ( values[i].min < res.min )
res.min = values[i].min;
if ( values[i].max > res.max )
res.max = values[i].max;
}
return res;
}
""")
def __init__(self, conf): def __init__(self, conf):
opts = self._parse_connection_url(conf.database_connection) opts = self._parse_connection_url(conf.database_connection)
LOG.info('connecting to MongoDB on %s:%s', opts['host'], opts['port']) LOG.info('connecting to MongoDB on %s:%s', opts['host'], opts['port'])
@ -373,15 +387,46 @@ class Connection(base.Connection):
return ({'resource_id': r['_id'], 'value': r['value']} return ({'resource_id': r['_id'], 'value': r['value']}
for r in results['results']) for r in results['results'])
def get_duration_sum(self, event_filter): def get_event_interval(self, event_filter):
"""Return the sum of time for the events described by the """Return the min and max timestamps from events,
query parameters. using the event_filter to limit the events seen.
( datetime.datetime(), datetime.datetime() )
""" """
q = make_query_from_filter(event_filter) q = make_query_from_filter(event_filter)
results = self.db.meter.map_reduce(self.MAP_COUNTER_DURATION, results = self.db.meter.map_reduce(self.MAP_TIMESTAMP,
self.REDUCE_MAX, self.REDUCE_MIN_MAX,
{'inline': 1}, {'inline': 1},
query=q, query=q,
) )
return ({'resource_id': r['_id'], 'value': r['value']} if results['results']:
for r in results['results']) answer = results['results'][0]['value']
a_min = answer['min']
a_max = answer['max']
if hasattr(a_min, 'valueOf') and a_min.valueOf is not None:
# NOTE (dhellmann): HACK ALERT
#
# The real MongoDB server can handle Date objects and
# the driver converts them to datetime instances
# correctly but the in-memory implementation in MIM
# (used by the tests) returns a spidermonkey.Object
# representing the "value" dictionary and there
# doesn't seem to be a way to recursively introspect
# that object safely to convert the min and max values
# back to datetime objects. In this method, we know
# what type the min and max values are expected to be,
# so it is safe to do the conversion
# here. JavaScript's time representation uses
# different units than Python's, so we divide to
# convert to the right units and then create the
# datetime instances to return.
#
# The issue with MIM is documented at
# https://sourceforge.net/p/merciless/bugs/3/
#
a_min = datetime.datetime.fromtimestamp(
a_min.valueOf() // 1000)
a_max = datetime.datetime.fromtimestamp(
a_max.valueOf() // 1000)
return (a_min, a_max)
return (None, None)

View File

@ -22,11 +22,13 @@ import json
import logging import logging
import os import os
import unittest import unittest
import urllib
import flask import flask
from ming import mim from ming import mim
import mock import mock
from ceilometer.tests import base as test_base
from ceilometer.api import v1 from ceilometer.api import v1
from ceilometer.storage import impl_mongodb from ceilometer.storage import impl_mongodb
@ -50,7 +52,7 @@ class Connection(impl_mongodb.Connection):
return mim.Connection() return mim.Connection()
class TestBase(unittest.TestCase): class TestBase(test_base.TestCase):
DBNAME = 'testdb' DBNAME = 'testdb'
@ -74,8 +76,12 @@ class TestBase(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.conn.conn.drop_database(self.DBNAME) self.conn.conn.drop_database(self.DBNAME)
def get(self, path): def get(self, path, **kwds):
rv = self.test_app.get(path) if kwds:
query = path + '?' + urllib.urlencode(kwds)
else:
query = path
rv = self.test_app.get(query)
try: try:
data = json.loads(rv.data) data = json.loads(rv.data)
except ValueError: except ValueError:

View File

@ -0,0 +1,135 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test listing raw events.
"""
import datetime
import logging
from ceilometer.openstack.common import timeutils
from ceilometer.tests import api as tests_api
LOG = logging.getLogger(__name__)
class TestComputeDurationByResource(tests_api.TestBase):
def setUp(self):
super(TestComputeDurationByResource, self).setUp()
# Create events relative to the range and pretend
# that the intervening events exist.
self.early1 = datetime.datetime(2012, 8, 27, 7, 0)
self.early2 = datetime.datetime(2012, 8, 27, 17, 0)
self.start = datetime.datetime(2012, 8, 28, 0, 0)
self.middle1 = datetime.datetime(2012, 8, 28, 8, 0)
self.middle2 = datetime.datetime(2012, 8, 28, 18, 0)
self.end = datetime.datetime(2012, 8, 28, 23, 59)
self.late1 = datetime.datetime(2012, 8, 29, 9, 0)
self.late2 = datetime.datetime(2012, 8, 29, 19, 0)
def _set_interval(self, start, end):
def get_interval(event_filter):
assert event_filter.start
assert event_filter.end
return (start, end)
self.stubs.Set(self.conn, 'get_event_interval', get_interval)
def _invoke_api(self):
return self.get(
'/resources/resource-id/meters/instance:m1.tiny/duration',
start_timestamp=self.start.isoformat(),
end_timestamp=self.end.isoformat(),
search_offset=10, # this value doesn't matter, db call is mocked
)
def test_before_range(self):
self._set_interval(self.early1, self.early2)
data = self._invoke_api()
assert data['start_timestamp'] is None
assert data['end_timestamp'] is None
assert data['duration'] is None
def _assert_times_match(self, actual, expected):
actual = timeutils.parse_isotime(actual).replace(tzinfo=None)
assert actual == expected
def test_overlap_range_start(self):
self._set_interval(self.early1, self.middle1)
data = self._invoke_api()
self._assert_times_match(data['start_timestamp'], self.start)
self._assert_times_match(data['end_timestamp'], self.middle1)
assert data['duration'] == 8 * 60
def test_within_range(self):
self._set_interval(self.middle1, self.middle2)
data = self._invoke_api()
self._assert_times_match(data['start_timestamp'], self.middle1)
self._assert_times_match(data['end_timestamp'], self.middle2)
assert data['duration'] == 10 * 60
def test_within_range_zero_duration(self):
self._set_interval(self.middle1, self.middle1)
data = self._invoke_api()
self._assert_times_match(data['start_timestamp'], self.middle1)
self._assert_times_match(data['end_timestamp'], self.middle1)
assert data['duration'] == 0
def test_overlap_range_end(self):
self._set_interval(self.middle2, self.late1)
data = self._invoke_api()
self._assert_times_match(data['start_timestamp'], self.middle2)
self._assert_times_match(data['end_timestamp'], self.end)
assert data['duration'] == (6 * 60) - 1
def test_after_range(self):
self._set_interval(self.late1, self.late2)
data = self._invoke_api()
assert data['start_timestamp'] is None
assert data['end_timestamp'] is None
assert data['duration'] is None
def test_without_end_timestamp(self):
def get_interval(event_filter):
return (self.late1, self.late2)
self.stubs.Set(self.conn, 'get_event_interval', get_interval)
data = self.get(
'/resources/resource-id/meters/instance:m1.tiny/duration',
start_timestamp=self.late1.isoformat(),
search_offset=10, # this value doesn't matter, db call is mocked
)
self._assert_times_match(data['start_timestamp'], self.late1)
self._assert_times_match(data['end_timestamp'], self.late2)
def test_without_start_timestamp(self):
def get_interval(event_filter):
return (self.early1, self.early2)
self.stubs.Set(self.conn, 'get_event_interval', get_interval)
data = self.get(
'/resources/resource-id/meters/instance:m1.tiny/duration',
end_timestamp=self.early2.isoformat(),
search_offset=10, # this value doesn't matter, db call is mocked
)
self._assert_times_match(data['start_timestamp'], self.early1)
self._assert_times_match(data['end_timestamp'], self.early2)

View File

@ -85,12 +85,18 @@ class Connection(impl_mongodb.Connection):
class MongoDBEngineTestBase(unittest.TestCase): class MongoDBEngineTestBase(unittest.TestCase):
# Only instantiate the database config
# and connection once, since spidermonkey
# causes issues if we allocate too many
# Runtime objects in the same process.
# http://davisp.lighthouseapp.com/projects/26898/tickets/22
conf = mox.Mox().CreateMockAnything()
conf.database_connection = 'mongodb://localhost/testdb'
conn = Connection(conf)
def setUp(self): def setUp(self):
super(MongoDBEngineTestBase, self).setUp() super(MongoDBEngineTestBase, self).setUp()
self.conf = mox.Mox().CreateMockAnything()
self.conf.database_connection = 'mongodb://localhost/testdb'
self.conn = Connection(self.conf)
self.conn.conn.drop_database('testdb') self.conn.conn.drop_database('testdb')
self.db = self.conn.conn['testdb'] self.db = self.conn.conn['testdb']
self.conn.db = self.db self.conn.db = self.db
@ -428,3 +434,109 @@ class SumTest(MongoDBEngineTestBase):
for r in results) for r in results)
assert counts['resource-id'] == 1 assert counts['resource-id'] == 1
assert set(counts.keys()) == set(['resource-id']) assert set(counts.keys()) == set(['resource-id'])
class TestGetEventInterval(MongoDBEngineTestBase):
def setUp(self):
super(TestGetEventInterval, self).setUp()
# NOTE(dhellmann): mim requires spidermonkey to implement the
# map-reduce functions, so if we can't import it then just
# skip these tests unless we aren't using mim.
try:
import spidermonkey
except:
if isinstance(self.conn.conn, mim.Connection):
raise skip.SkipTest('requires spidermonkey')
# Create events relative to the range and pretend
# that the intervening events exist.
self.start = datetime.datetime(2012, 8, 28, 0, 0)
self.end = datetime.datetime(2012, 8, 29, 0, 0)
self.early1 = self.start - datetime.timedelta(minutes=20)
self.early2 = self.start - datetime.timedelta(minutes=10)
self.middle1 = self.start + datetime.timedelta(minutes=10)
self.middle2 = self.end - datetime.timedelta(minutes=10)
self.late1 = self.end + datetime.timedelta(minutes=10)
self.late2 = self.end + datetime.timedelta(minutes=20)
self._filter = storage.EventFilter(
resource='resource-id',
meter='instance',
start=self.start,
end=self.end,
)
def _make_events(self, *timestamps):
for t in timestamps:
c = counter.Counter(
'test',
'instance',
'cumulative',
1,
'user-id',
'project-id',
'resource-id',
timestamp=t,
duration=0,
resource_metadata={'display_name': 'test-server',
}
)
msg = meter.meter_message_from_counter(c)
self.conn.record_metering_data(msg)
def test_before_range(self):
self._make_events(self.early1, self.early2)
s, e = self.conn.get_event_interval(self._filter)
assert s is None
assert e is None
def test_overlap_range_start(self):
self._make_events(self.early1, self.start, self.middle1)
s, e = self.conn.get_event_interval(self._filter)
assert s == self.start
assert e == self.middle1
def test_within_range(self):
self._make_events(self.middle1, self.middle2)
s, e = self.conn.get_event_interval(self._filter)
assert s == self.middle1
assert e == self.middle2
def test_within_range_zero_duration(self):
self._make_events(self.middle1)
s, e = self.conn.get_event_interval(self._filter)
assert s == self.middle1
assert e == self.middle1
def test_within_range_zero_duration_two_events(self):
self._make_events(self.middle1, self.middle1)
s, e = self.conn.get_event_interval(self._filter)
assert s == self.middle1
assert e == self.middle1
def test_overlap_range_end(self):
self._make_events(self.middle2, self.end, self.late1)
s, e = self.conn.get_event_interval(self._filter)
assert s == self.middle2
assert e == self.middle2
def test_overlap_range_end_with_offset(self):
self._make_events(self.middle2, self.end, self.late1)
self._filter.end = self.late1
s, e = self.conn.get_event_interval(self._filter)
assert s == self.middle2
assert e == self.end
def test_after_range(self):
self._make_events(self.late1, self.late2)
s, e = self.conn.get_event_interval(self._filter)
assert s is None
assert e is None