Add DataPoint/DataFrame objects

This introduce the DataPoint and DataFrame objects. These are replacing the
dicts that were used until now:

A DataPoint is an immutable object representing one measure for one metric,
with associated price, qty, groupby and metadata.

A DataFrame is a collection of DataPoints, with helper functions for
point insertion and JSON formatting/loading.

Story: 2005890
Task: 35658
Change-Id: I71e95bba0a0cdd049b0fba3e79c7675b6365c37f
This commit is contained in:
Luka Peschke 2019-07-19 12:11:05 +02:00
parent 2b0735d776
commit 2f4acdce4a
23 changed files with 951 additions and 461 deletions

View File

@ -14,7 +14,6 @@
# under the License.
#
import datetime
import decimal
from oslo_config import cfg
import pecan
@ -71,30 +70,21 @@ class DataFramesController(rest.RestController):
except storage.NoTimeFrame:
return storage_models.DataFrameCollection(dataframes=[])
for frame in resp['dataframes']:
for service, data_list in frame['usage'].items():
frame_tenant = None
resources = []
for data in data_list:
# This means we use a v1 storage backend
if 'desc' in data.keys():
desc = data['desc']
else:
desc = data['metadata'].copy()
desc.update(data.get('groupby', {}))
price = decimal.Decimal(str(data['rating']['price']))
resources = []
frame_tenant = None
for type_, points in frame.itertypes():
for point in points:
resource = storage_models.RatedResource(
service=service,
desc=desc,
volume=data['vol']['qty'],
rating=price)
service=type_,
desc=point.desc,
volume=point.qty,
rating=point.price)
if frame_tenant is None:
frame_tenant = desc[scope_key]
frame_tenant = point.desc[scope_key]
resources.append(resource)
dataframe = storage_models.DataFrame(
begin=tzutils.local_to_utc(
frame['period']['begin'], naive=True),
end=tzutils.local_to_utc(
frame['period']['end'], naive=True),
begin=tzutils.local_to_utc(frame.start, naive=True),
end=tzutils.local_to_utc(frame.end, naive=True),
tenant_id=frame_tenant,
resources=resources)
dataframes.append(dataframe)

View File

@ -277,7 +277,7 @@ class BaseCollector(object):
if not data:
raise NoDataCollected(self.collector_name, name)
return self.t_cloudkitty.format_service(name, data)
return name, data
def validate_conf(conf):

279
cloudkitty/dataframe.py Normal file
View File

@ -0,0 +1,279 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import collections
import datetime
import decimal
import functools
import voluptuous
from werkzeug import datastructures
from cloudkitty import json_utils as json
from cloudkitty import tzutils
from cloudkitty import validation_utils as vutils
# NOTE(peschk_l): qty and price are converted to strings to avoid
# floating-point conversion issues:
# Decimal(0.121) == Decimal('0.12099999999999999644728632119')
# Decimal(str(0.121)) == Decimal('0.121')
DATAPOINT_SCHEMA = voluptuous.Schema({
voluptuous.Required('vol'): {
voluptuous.Required('unit'): vutils.get_string_type(),
voluptuous.Required('qty'): voluptuous.Coerce(str),
},
voluptuous.Required('rating', default={}): {
voluptuous.Required('price', default=0):
voluptuous.Coerce(str),
},
voluptuous.Required('groupby'): vutils.DictTypeValidator(str, str),
voluptuous.Required('metadata'): vutils.DictTypeValidator(str, str),
})
_DataPointBase = collections.namedtuple(
"DataPoint",
field_names=("unit", "qty", "price", "groupby", "metadata"))
class DataPoint(_DataPointBase):
def __new__(cls, unit, qty, price, groupby, metadata):
return _DataPointBase.__new__(
cls,
unit or "undefined",
# NOTE(peschk_l): avoids floating-point issues.
decimal.Decimal(str(qty) if isinstance(qty, float) else qty),
decimal.Decimal(str(price) if isinstance(price, float) else price),
datastructures.ImmutableDict(groupby),
datastructures.ImmutableDict(metadata),
)
def set_price(self, price):
"""Sets the price of the DataPoint and returns a new object."""
return self._replace(price=price)
def as_dict(self, legacy=False, mutable=False):
"""Returns a dict representation of the object.
The returned dict is immutable by default and has the
following format::
{
"vol": {
"unit": "GiB",
"qty": 1.2,
},
"rating": {
"price": 0.04,
},
"groupby": {
"group_one": "one",
"group_two": "two",
},
"metadata": {
"attr_one": "one",
"attr_two": "two",
},
}
The dict can also be returned in the legacy (v1 storage) format. In
that case, `groupby` and `metadata` will be removed and merged together
into the `desc` key.
:param legacy: Defaults to False. If True, returned dict is in legacy
format.
:type legacy: bool
:param mutable: Defaults to False. If True, returns a normal dict
instead of an ImmutableDict.
:type mutable: bool
"""
output = {
"vol": {
"unit": self.unit,
"qty": self.qty,
},
"rating": {
"price": self.price,
},
"groupby": dict(self.groupby) if mutable else self.groupby,
"metadata": dict(self.metadata) if mutable else self.metadata,
}
if legacy:
desc = output.pop("metadata")
desc.update(output.pop("groupby"))
output['desc'] = desc
return output if mutable else datastructures.ImmutableDict(output)
def json(self, legacy=False):
"""Returns a json representation of the dict returned by `as_dict`.
:param legacy: Defaults to False. If True, returned dict is in legacy
format.
:type legacy: bool
:rtype: str
"""
return json.dumps(self.as_dict(legacy=legacy, mutable=True))
@classmethod
def from_dict(cls, dict_, legacy=False):
"""Returns a new DataPoint instance build from a dict.
:param dict_: Dict to build the DataPoint from
:type dict_: dict
:param legacy: Set to true to convert the dict to a the new format
before validating it.
:rtype: DataPoint
"""
try:
if legacy:
dict_['groupby'] = dict_.pop('desc')
dict_['metadata'] = {}
valid = DATAPOINT_SCHEMA(dict_)
return cls(
unit=valid["vol"]["unit"],
qty=valid["vol"]["qty"],
price=valid["rating"]["price"],
groupby=valid["groupby"],
metadata=valid["metadata"],
)
except (voluptuous.Invalid, KeyError) as e:
raise ValueError("{} isn't a valid DataPoint: {}".format(dict_, e))
@property
def desc(self):
output = dict(self.metadata)
output.update(self.groupby)
return datastructures.ImmutableDict(output)
DATAFRAME_SCHEMA = voluptuous.Schema({
voluptuous.Required('period'): {
voluptuous.Required('begin'): voluptuous.Any(
datetime.datetime, voluptuous.Coerce(tzutils.dt_from_iso)),
voluptuous.Required('end'): voluptuous.Any(
datetime.datetime, voluptuous.Coerce(tzutils.dt_from_iso)),
},
voluptuous.Required('usage'): vutils.IterableValuesDict(
str, DataPoint.from_dict),
})
class DataFrame(object):
__slots__ = ("start", "end", "_usage")
def __init__(self, start, end, usage=None):
if not isinstance(start, datetime.datetime):
raise TypeError(
'"start" must be of type datetime.datetime, not {}'.format(
type(start)))
if not isinstance(end, datetime.datetime):
raise TypeError(
'"end" must be of type datetime.datetime, not {}'.format(
type(end)))
if usage is not None and not isinstance(usage, dict):
raise TypeError(
'"usage" must be a dict, not {}'.format(type(usage)))
self.start = start
self.end = end
self._usage = collections.OrderedDict()
if usage:
for key in sorted(usage.keys()):
self.add_points(usage[key], key)
def as_dict(self, legacy=False, mutable=False):
output = {
"period": {"begin": self.start, "end": self.end},
"usage": {
key: [v.as_dict(legacy=legacy, mutable=mutable) for v in val]
for key, val in self._usage.items()
},
}
return output if mutable else datastructures.ImmutableDict(output)
def json(self, legacy=False):
return json.dumps(self.as_dict(legacy=legacy, mutable=True))
@classmethod
def from_dict(cls, dict_, legacy=False):
try:
schema = DATAFRAME_SCHEMA
if legacy:
validator = functools.partial(DataPoint.from_dict, legacy=True)
# NOTE(peschk_l): __name__ is required for voluptuous exception
# message formatting
validator.__name__ = 'DataPoint.from_dict'
# NOTE(peschk_l): In case the legacy format is required, we
# create a new schema where DataPoint.from_dict is called with
# legacy=True. The "extend" method does create a new objects,
# and replaces existing keys with new ones.
schema = DATAFRAME_SCHEMA.extend({
voluptuous.Required('usage'): vutils.IterableValuesDict(
str, validator
),
})
valid = schema(dict_)
return cls(
valid["period"]["begin"],
valid["period"]["end"],
usage=valid["usage"])
except (voluptuous.error.Invalid, KeyError) as e:
raise ValueError("{} isn't a valid DataFrame: {}".format(dict_, e))
def add_points(self, points, type_):
"""Adds multiple points to the DataFrame
:param points: DataPoints to add.
:type point: list of DataPoints
"""
if type_ in self._usage:
self._usage[type_] += points
else:
self._usage[type_] = points
def add_point(self, point, type_):
"""Adds a single point to the DataFrame
:param point: DataPoint to add.
:type point: DataPoint
"""
if type_ in self._usage:
self._usage[type_].append(point)
else:
self._usage[type_] = [point]
def iterpoints(self):
"""Iterates over all datapoints of the dataframe.
Yields (type, point) tuples.
:rtype: (str, DataPoint)
"""
for type_, points in self._usage.items():
for point in points:
yield type_, point
def itertypes(self):
"""Iterates over all types of the dataframe.
Yields (type, (point, )) tuples.
:rtype: (str, (DataPoint, ))
"""
for type_, points in self._usage.items():
yield type_, points
def __repr__(self):
return 'DataFrame(metrics=[{}])'.format(','.join(self._usage.keys()))

View File

@ -34,6 +34,7 @@ from tooz import coordination
from cloudkitty import collector
from cloudkitty import config # noqa
from cloudkitty import dataframe
from cloudkitty import extension_manager
from cloudkitty import messaging
from cloudkitty import storage
@ -249,18 +250,16 @@ class Worker(BaseWorker):
next_timestamp = tzutils.add_delta(
start_timestamp, timedelta(seconds=self._period))
raw_data = self._collector.retrieve(
name, data = self._collector.retrieve(
metric,
start_timestamp,
next_timestamp,
self._tenant_id,
)
if not raw_data:
if not data:
raise collector.NoDataCollected
return {'period': {'begin': start_timestamp,
'end': next_timestamp},
'usage': raw_data}
return name, data
def _do_collection(self, metrics, timestamp):
@ -276,7 +275,7 @@ class Worker(BaseWorker):
metric=metric,
ts=timestamp)
)
return None
return metric, None
except Exception as e:
LOG.warning(
'[scope: {scope}, worker: {worker}] Error while collecting'
@ -293,8 +292,8 @@ class Worker(BaseWorker):
# system in workers
sys.exit(1)
return list(filter(
lambda x: x is not None,
return dict(filter(
lambda x: x[1] is not None,
eventlet.GreenPool(size=CONF.orchestrator.max_greenthreads).imap(
_get_result, metrics)))
@ -307,14 +306,20 @@ class Worker(BaseWorker):
metrics = list(self._conf['metrics'].keys())
# Collection
data = self._do_collection(metrics, timestamp)
usage_data = self._do_collection(metrics, timestamp)
frame = dataframe.DataFrame(
start=timestamp,
end=tzutils.add_delta(timestamp,
timedelta(seconds=self._period)),
usage=usage_data,
)
# Rating
for processor in self._processors:
processor.obj.process(data)
frame = processor.obj.process(frame)
# Writing
self._storage.push(data, self._tenant_id)
self._storage.push([frame], self._tenant_id)
self._state.set_state(self._tenant_id, timestamp)

View File

@ -15,6 +15,7 @@
#
import decimal
from cloudkitty import dataframe
from cloudkitty import rating
from cloudkitty.rating.hash.controllers import root as root_api
from cloudkitty.rating.hash.db import api as hash_db_api
@ -149,9 +150,7 @@ class HashMap(rating.RatingProcessorBase):
field_name = field_db.name
self._load_field_entries(service_name, field_name, field_uuid)
def add_rating_informations(self, data):
if 'rating' not in data:
data['rating'] = {'price': 0}
def add_rating_informations(self, point):
for entry in self._res.values():
rate = entry['rate']
flat = entry['flat']
@ -161,14 +160,14 @@ class HashMap(rating.RatingProcessorBase):
else:
rate *= entry['threshold']['cost']
res = rate * flat
# FIXME(sheeprine): Added here to ensure that qty is decimal
res *= decimal.Decimal(data['vol']['qty'])
res *= point.qty
if entry['threshold']['scope'] == 'service':
if entry['threshold']['type'] == 'flat':
res += entry['threshold']['cost']
else:
res *= entry['threshold']['cost']
data['rating']['price'] += res
point = point.set_price(point.price + res)
return point
def update_result(self,
group,
@ -228,7 +227,7 @@ class HashMap(rating.RatingProcessorBase):
True,
threshold_type)
def process_services(self, service_name, data):
def process_services(self, service_name, point):
if service_name not in self._entries:
return
service_mappings = self._entries[service_name]['mappings']
@ -238,15 +237,15 @@ class HashMap(rating.RatingProcessorBase):
mapping['cost'])
service_thresholds = self._entries[service_name]['thresholds']
self.process_thresholds(service_thresholds,
data['vol']['qty'],
point.qty,
'service')
def process_fields(self, service_name, data):
def process_fields(self, service_name, point):
if service_name not in self._entries:
return
if 'fields' not in self._entries[service_name]:
return
desc_data = data['desc']
desc_data = point.desc
field_mappings = self._entries[service_name]['fields']
for field_name, group_mappings in field_mappings.items():
if field_name not in desc_data:
@ -260,12 +259,12 @@ class HashMap(rating.RatingProcessorBase):
'field')
def process(self, data):
for cur_data in data:
cur_usage = cur_data['usage']
for service_name, service_data in cur_usage.items():
for item in service_data:
self._res = {}
self.process_services(service_name, item)
self.process_fields(service_name, item)
self.add_rating_informations(item)
return data
output = dataframe.DataFrame(start=data.start, end=data.end)
for service_name, point in data.iterpoints():
self._res = {}
self.process_services(service_name, point)
self.process_fields(service_name, point)
output.add_point(self.add_rating_informations(point), service_name)
return output

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import decimal
from cloudkitty import rating
@ -39,10 +37,4 @@ class Noop(rating.RatingProcessorBase):
pass
def process(self, data):
for cur_data in data:
cur_usage = cur_data['usage']
for service in cur_usage:
for entry in cur_usage[service]:
if 'rating' not in entry:
entry['rating'] = {'price': decimal.Decimal(0)}
return data

View File

@ -80,5 +80,5 @@ class PyScripts(rating.RatingProcessorBase):
def process(self, data):
for script in self._scripts.values():
self.start_script(script['code'], data)
data = self.start_script(script['code'], data)
return data

View File

@ -19,6 +19,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver
from cloudkitty import dataframe
from cloudkitty.storage import v2 as storage_v2
from cloudkitty import tzutils
@ -76,18 +77,19 @@ class V1StorageAdapter(storage_v2.BaseStorage):
@staticmethod
def __update_frames_timestamps(func, frames, **kwargs):
for frame in frames:
period = frame['period'] if 'period' in frame.keys() else frame
begin = period['begin']
end = period['end']
if begin:
period['begin'] = func(begin, **kwargs)
start = frame.start
end = frame.end
if start:
frame.start = func(start, **kwargs)
if end:
period['end'] = func(end, **kwargs)
frame.end = func(end, **kwargs)
def push(self, dataframes, scope_id=None):
if dataframes:
self._make_dataframes_naive(dataframes)
self.storage.append(dataframes, scope_id)
self.storage.append(
[d.as_dict(mutable=True, legacy=True) for d in dataframes],
scope_id)
self.storage.commit(scope_id)
@staticmethod
@ -107,12 +109,24 @@ class V1StorageAdapter(storage_v2.BaseStorage):
tzutils.local_to_utc(end, naive=True) if end else None,
res_type=metric_types,
tenant_id=tenant_id)
frames = [dataframe.DataFrame.from_dict(frame, legacy=True)
for frame in frames]
self._localize_dataframes(frames)
return {
'total': len(frames),
'dataframes': frames,
}
@staticmethod
def _localize_total(iterable):
for elem in iterable:
begin = elem['begin']
end = elem['end']
if begin:
elem['begin'] = tzutils.utc_to_local(begin)
if end:
elem['end'] = tzutils.utc_to_local(end)
def total(self, groupby=None,
begin=None, end=None,
metric_types=None,
@ -145,8 +159,7 @@ class V1StorageAdapter(storage_v2.BaseStorage):
t['type'] = t.get('res_type')
else:
t['type'] = None
self._localize_dataframes(total)
self._localize_total(total)
return {
'total': len(total),
'results': total,

View File

@ -65,7 +65,7 @@ class RatedDataFrame(Base, models.ModelBase):
res_dict['rating'] = rating_dict
res_dict['desc'] = json.loads(self.desc)
res_dict['vol'] = vol_dict
res_dict['tenant_id'] = self.tenant_id
res_dict['desc']['tenant_id'] = self.tenant_id
# Add resource to the usage dict
usage_dict = {}

View File

@ -53,39 +53,8 @@ class BaseStorage(object):
def push(self, dataframes, scope_id=None):
"""Pushes dataframes to the storage backend
A dataframe has the following format::
{
"usage": {
"bananas": [ # metric name
{
"vol": {
"unit": "banana",
"qty": 1
},
"rating": {
"price": 1
},
"groupby": {
"xxx_id": "hello",
"yyy_id": "bye",
},
"metadata": {
"flavor": "chocolate",
"eaten_by": "gorilla",
},
}
],
"metric_name2": [...],
}
"period": {
"begin": "1239781290", # timestamp
"end": "1239793490", # timestamp
}
}
:param dataframes: List of dataframes
:type dataframes: list
:type dataframes: [cloudkitty.dataframe.DataFrame]
"""
@abc.abstractmethod

View File

@ -12,15 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import datetime
import decimal
import influxdb
from oslo_config import cfg
from oslo_log import log
import six
from cloudkitty import dataframe
from cloudkitty.storage import v2 as v2_storage
from cloudkitty import tzutils
@ -112,21 +111,27 @@ class InfluxClient(object):
def append_point(self,
metric_type,
timestamp,
qty, price, unit,
fields, tags):
"""Adds two points to commit to InfluxDB"""
point):
"""Adds a point to commit to InfluxDB.
measurement_fields = copy.deepcopy(fields)
measurement_fields['qty'] = float(qty)
measurement_fields['price'] = float(price)
measurement_fields['unit'] = unit
:param metric_type: Name of the metric type
:type metric_type: str
:param timestamp: Timestamp of the time
:type timestamp: datetime.datetime
:param point: Point to push
:type point: dataframe.DataPoint
"""
measurement_fields = dict(point.metadata)
measurement_fields['qty'] = float(point.qty)
measurement_fields['price'] = float(point.price)
measurement_fields['unit'] = point.unit
# Unfortunately, this seems to be the fastest way: Having several
# measurements would imply a high client-side workload, and this allows
# us to filter out unrequired keys
measurement_fields['groupby'] = '|'.join(tags.keys())
measurement_fields['metadata'] = '|'.join(fields.keys())
measurement_fields['groupby'] = '|'.join(point.groupby.keys())
measurement_fields['metadata'] = '|'.join(point.metadata.keys())
measurement_tags = copy.deepcopy(tags)
measurement_tags = dict(point.groupby)
measurement_tags['type'] = metric_type
self._points.append({
@ -243,19 +248,10 @@ class InfluxStorage(v2_storage.BaseStorage):
def push(self, dataframes, scope_id=None):
for dataframe in dataframes:
timestamp = dataframe['period']['begin']
for metric_name, metrics in dataframe['usage'].items():
for metric in metrics:
self._conn.append_point(
metric_name,
timestamp,
metric['vol']['qty'],
metric['rating']['price'],
metric['vol']['unit'],
metric['metadata'],
metric['groupby'],
)
for frame in dataframes:
timestamp = frame.start
for type_, point in frame.iterpoints():
self._conn.append_point(type_, timestamp, point)
self._conn.commit()
@ -269,21 +265,17 @@ class InfluxStorage(v2_storage.BaseStorage):
@staticmethod
def _point_to_dataframe_entry(point):
groupby = (point.pop('groupby', None) or '').split('|')
groupby = [g for g in groupby if g]
metadata = (point.pop('metadata', None) or '').split('|')
metadata = [m for m in metadata if m]
return {
'vol': {
'unit': point['unit'],
'qty': decimal.Decimal(point['qty']),
},
'rating': {
'price': point['price'],
},
'groupby': {key: point.get(key, '') for key in groupby},
'metadata': {key: point.get(key, '') for key in metadata},
}
groupby = filter(lambda x: bool(x),
(point.pop('groupby', None) or '').split('|'))
metadata = filter(lambda x: bool(x),
(point.pop('metadata', None) or '').split('|'))
return dataframe.DataPoint(
point['unit'],
point['qty'],
point['price'],
{key: point.get(key, '') for key in groupby},
{key: point.get(key, '') for key in metadata},
)
def _build_dataframes(self, points):
dataframes = {}
@ -291,21 +283,17 @@ class InfluxStorage(v2_storage.BaseStorage):
point_type = point['type']
time = tzutils.dt_from_iso(point['time'])
if time not in dataframes.keys():
dataframes[time] = {
'period': {
'begin': time,
'end': tzutils.add_delta(
time, datetime.timedelta(seconds=self._period))
},
'usage': {},
}
usage = dataframes[time]['usage']
if point_type not in usage.keys():
usage[point_type] = []
usage[point_type].append(self._point_to_dataframe_entry(point))
dataframes[time] = dataframe.DataFrame(
start=time,
end=tzutils.add_delta(
time, datetime.timedelta(seconds=self._period)),
)
dataframes[time].add_point(
self._point_to_dataframe_entry(point), point_type)
output = list(dataframes.values())
output.sort(key=lambda x: x['period']['begin'])
output.sort(key=lambda frame: frame.start)
return output
def retrieve(self, begin=None, end=None,

View File

@ -21,6 +21,7 @@ from cloudkitty import collector
from cloudkitty.collector import exceptions
from cloudkitty.collector import prometheus
from cloudkitty.common.prometheus_client import PrometheusResponseError
from cloudkitty import dataframe
from cloudkitty import tests
from cloudkitty.tests import samples
from cloudkitty import transformer
@ -119,34 +120,17 @@ class PrometheusCollectorTest(tests.TestCase):
self.assertEqual(expected, actual)
def test_format_retrieve(self):
expected = {
'http_requests_total': [
{
'desc': {
'bar': '', 'foo': '', 'project_id': '',
'code': '200', 'instance': 'localhost:9090',
},
'groupby': {'bar': '', 'foo': '', 'project_id': ''},
'metadata': {'code': '200', 'instance': 'localhost:9090'},
'vol': {
'qty': Decimal('7'),
'unit': 'instance'
}
},
{
'desc': {
'bar': '', 'foo': '', 'project_id': '',
'code': '200', 'instance': 'localhost:9090',
},
'groupby': {'bar': '', 'foo': '', 'project_id': ''},
'metadata': {'code': '200', 'instance': 'localhost:9090'},
'vol': {
'qty': Decimal('42'),
'unit': 'instance'
}
}
]
}
expected_name = 'http_requests_total'
expected_data = [
dataframe.DataPoint(
'instance', '7', '0',
{'bar': '', 'foo': '', 'project_id': ''},
{'code': '200', 'instance': 'localhost:9090'}),
dataframe.DataPoint(
'instance', '42', '0',
{'bar': '', 'foo': '', 'project_id': ''},
{'code': '200', 'instance': 'localhost:9090'}),
]
no_response = mock.patch(
'cloudkitty.common.prometheus_client.PrometheusClient.get_instant',
@ -154,7 +138,7 @@ class PrometheusCollectorTest(tests.TestCase):
)
with no_response:
actual = self.collector.retrieve(
actual_name, actual_data = self.collector.retrieve(
metric_name='http_requests_total',
start=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
@ -162,7 +146,8 @@ class PrometheusCollectorTest(tests.TestCase):
q_filter=None,
)
self.assertEqual(expected, actual)
self.assertEqual(expected_name, actual_name)
self.assertEqual(expected_data, actual_data)
def test_format_retrieve_raise_NoDataCollected(self):
no_response = mock.patch(

View File

@ -14,10 +14,12 @@
# under the License.
#
import abc
import collections
import datetime
import decimal
import os
from dateutil import tz
from gabbi import fixture
import mock
from oslo_config import cfg
@ -35,6 +37,7 @@ import wsmeext.pecan as wsme_pecan
from cloudkitty.api import app
from cloudkitty.api import middleware
from cloudkitty import dataframe
from cloudkitty import db
from cloudkitty.db import api as ck_db_api
from cloudkitty import messaging
@ -48,7 +51,8 @@ from cloudkitty.tests import utils as test_utils
from cloudkitty import tzutils
from cloudkitty import utils as ck_utils
INITIAL_TIMESTAMP = 1420070400
INITIAL_DT = datetime.datetime(2015, 1, 1, tzinfo=tz.UTC)
class UUIDFixture(fixture.GabbiFixture):
@ -294,41 +298,31 @@ class QuoteFakeRPC(BaseFakeRPC):
class BaseStorageDataFixture(fixture.GabbiFixture):
def create_fake_data(self, begin, end, project_id):
if isinstance(begin, int):
begin = ck_utils.ts2dt(begin)
if isinstance(end, int):
end = ck_utils.ts2dt(end)
data = [{
"period": {
"begin": begin,
"end": end},
"usage": {
"cpu": [
{
"desc": {
"dummy": True,
"fake_meta": 1.0,
"project_id": project_id},
"vol": {
"qty": 1,
"unit": "nothing"},
"rating": {
"price": decimal.Decimal('1.337')}}]}}, {
"period": {
"begin": begin,
"end": end},
"usage": {
"image.size": [
{
"desc": {
"dummy": True,
"fake_meta": 1.0,
"project_id": project_id},
"vol": {
"qty": 1,
"unit": "nothing"},
"rating": {
"price": decimal.Decimal('0.121')}}]}}]
cpu_point = dataframe.DataPoint(
unit="nothing",
qty=1,
groupby={"fake_meta": 1.0, "project_id": project_id},
metadata={"dummy": True},
price=decimal.Decimal('1.337'),
)
image_point = dataframe.DataPoint(
unit="nothing",
qty=1,
groupby={"fake_meta": 1.0, "project_id": project_id},
metadata={"dummy": True},
price=decimal.Decimal('0.121'),
)
data = [
dataframe.DataFrame(
start=begin, end=end,
usage=collections.OrderedDict({"cpu": [cpu_point]}),
),
dataframe.DataFrame(
start=begin, end=end,
usage=collections.OrderedDict({"image.size": [image_point]}),
),
]
return data
def start_fixture(self):
@ -356,33 +350,38 @@ class BaseStorageDataFixture(fixture.GabbiFixture):
class StorageDataFixture(BaseStorageDataFixture):
def initialize_data(self):
nodata_duration = (24 * 3 + 12) * 3600
hour_delta = datetime.timedelta(seconds=3600)
tenant_list = ['8f82cc70-e50c-466e-8624-24bdea811375',
'7606a24a-b8ad-4ae0-be6c-3d7a41334a2e']
data_ts = INITIAL_TIMESTAMP + nodata_duration + 3600
data_duration = (24 * 2 + 8) * 3600
for i in range(data_ts,
data_ts + data_duration,
3600):
data_dt = INITIAL_DT + datetime.timedelta(
seconds=nodata_duration + 3600)
data_duration = datetime.timedelta(seconds=(24 * 2 + 8) * 3600)
iter_dt = data_dt
while iter_dt < data_dt + data_duration:
data = self.create_fake_data(
i, i + 3600, tenant_list[0])
iter_dt, iter_dt + hour_delta, tenant_list[0])
self.storage.push(data, tenant_list[0])
half_duration = int(data_duration / 2)
for i in range(data_ts,
data_ts + half_duration,
3600):
data = self.create_fake_data(i, i + 3600, tenant_list[1])
iter_dt += hour_delta
iter_dt = data_dt
while iter_dt < data_dt + data_duration / 2:
data = self.create_fake_data(
iter_dt, iter_dt + hour_delta, tenant_list[1])
self.storage.push(data, tenant_list[1])
iter_dt += hour_delta
class NowStorageDataFixture(BaseStorageDataFixture):
def initialize_data(self):
begin = ck_utils.get_month_start_timestamp()
for i in range(begin,
begin + 3600 * 12,
3600):
dt = tzutils.get_month_start(naive=True).replace(tzinfo=tz.UTC)
hour_delta = datetime.timedelta(seconds=3600)
limit = dt + hour_delta * 12
while dt < limit:
project_id = '3d9a1b33-482f-42fd-aef9-b575a3da9369'
data = self.create_fake_data(i, i + 3600, project_id)
data = self.create_fake_data(dt, dt + hour_delta, project_id)
self.storage.push(data, project_id)
dt += hour_delta
class ScopeStateFixture(fixture.GabbiFixture):

View File

@ -75,8 +75,8 @@ tests:
$.dataframes[0].resources[0].volume: "1"
$.dataframes[0].resources[0].rating: "1.337"
$.dataframes[0].resources[0].service: "cpu"
$.dataframes[0].resources[0].desc.dummy: true
$.dataframes[0].resources[0].desc.fake_meta: 1.0
$.dataframes[0].resources[0].desc.dummy: 'True'
$.dataframes[0].resources[0].desc.fake_meta: '1.0'
$.dataframes[1].tenant_id: "8f82cc70-e50c-466e-8624-24bdea811375"
$.dataframes[1].begin: "2015-01-04T13:00:00"
$.dataframes[1].end: "2015-01-04T14:00:00"
@ -84,8 +84,8 @@ tests:
$.dataframes[1].resources[0].volume: "1"
$.dataframes[1].resources[0].rating: "0.121"
$.dataframes[1].resources[0].service: "image.size"
$.dataframes[1].resources[0].desc.dummy: true
$.dataframes[1].resources[0].desc.fake_meta: 1.0
$.dataframes[1].resources[0].desc.dummy: 'True'
$.dataframes[1].resources[0].desc.fake_meta: '1.0'
- name: fetch data for the second tenant
url: /v1/storage/dataframes
@ -103,8 +103,8 @@ tests:
$.dataframes[0].resources[0].volume: "1"
$.dataframes[0].resources[0].rating: "1.337"
$.dataframes[0].resources[0].service: "cpu"
$.dataframes[0].resources[0].desc.dummy: true
$.dataframes[0].resources[0].desc.fake_meta: 1.0
$.dataframes[0].resources[0].desc.dummy: 'True'
$.dataframes[0].resources[0].desc.fake_meta: '1.0'
$.dataframes[1].tenant_id: "7606a24a-b8ad-4ae0-be6c-3d7a41334a2e"
$.dataframes[1].begin: "2015-01-04T13:00:00"
$.dataframes[1].end: "2015-01-04T14:00:00"
@ -112,8 +112,8 @@ tests:
$.dataframes[1].resources[0].volume: "1"
$.dataframes[1].resources[0].rating: "0.121"
$.dataframes[1].resources[0].service: "image.size"
$.dataframes[1].resources[0].desc.dummy: true
$.dataframes[1].resources[0].desc.fake_meta: 1.0
$.dataframes[1].resources[0].desc.dummy: 'True'
$.dataframes[1].resources[0].desc.fake_meta: '1.0'
- name: fetch data for multiple tenants
url: /v1/storage/dataframes
@ -130,8 +130,8 @@ tests:
$.dataframes[0].resources[0].volume: "1"
$.dataframes[0].resources[0].rating: "1.337"
$.dataframes[0].resources[0].service: "cpu"
$.dataframes[0].resources[0].desc.dummy: true
$.dataframes[0].resources[0].desc.fake_meta: 1.0
$.dataframes[0].resources[0].desc.dummy: 'True'
$.dataframes[0].resources[0].desc.fake_meta: '1.0'
$.dataframes[1].tenant_id: "8f82cc70-e50c-466e-8624-24bdea811375"
$.dataframes[1].begin: "2015-01-04T13:00:00"
$.dataframes[1].end: "2015-01-04T14:00:00"
@ -139,8 +139,8 @@ tests:
$.dataframes[1].resources[0].volume: "1"
$.dataframes[1].resources[0].rating: "0.121"
$.dataframes[1].resources[0].service: "image.size"
$.dataframes[1].resources[0].desc.dummy: true
$.dataframes[1].resources[0].desc.fake_meta: 1.0
$.dataframes[1].resources[0].desc.dummy: 'True'
$.dataframes[1].resources[0].desc.fake_meta: '1.0'
$.dataframes[2].tenant_id: "7606a24a-b8ad-4ae0-be6c-3d7a41334a2e"
$.dataframes[2].begin: "2015-01-04T13:00:00"
$.dataframes[2].end: "2015-01-04T14:00:00"
@ -148,8 +148,8 @@ tests:
$.dataframes[2].resources[0].volume: "1"
$.dataframes[2].resources[0].rating: "1.337"
$.dataframes[2].resources[0].service: "cpu"
$.dataframes[2].resources[0].desc.dummy: true
$.dataframes[2].resources[0].desc.fake_meta: 1.0
$.dataframes[2].resources[0].desc.dummy: 'True'
$.dataframes[2].resources[0].desc.fake_meta: '1.0'
$.dataframes[3].tenant_id: "7606a24a-b8ad-4ae0-be6c-3d7a41334a2e"
$.dataframes[3].begin: "2015-01-04T13:00:00"
$.dataframes[3].end: "2015-01-04T14:00:00"
@ -157,8 +157,8 @@ tests:
$.dataframes[3].resources[0].volume: "1"
$.dataframes[3].resources[0].rating: "0.121"
$.dataframes[3].resources[0].service: "image.size"
$.dataframes[3].resources[0].desc.dummy: true
$.dataframes[3].resources[0].desc.fake_meta: 1.0
$.dataframes[3].resources[0].desc.dummy: 'True'
$.dataframes[3].resources[0].desc.fake_meta: '1.0'
- name: fetch data filtering on cpu service and tenant
url: /v1/storage/dataframes
@ -177,8 +177,8 @@ tests:
$.dataframes[0].resources[0].volume: "1"
$.dataframes[0].resources[0].rating: "1.337"
$.dataframes[0].resources[0].service: "cpu"
$.dataframes[0].resources[0].desc.dummy: true
$.dataframes[0].resources[0].desc.fake_meta: 1.0
$.dataframes[0].resources[0].desc.dummy: 'True'
$.dataframes[0].resources[0].desc.fake_meta: '1.0'
- name: fetch data filtering on image service and tenant
url: /v1/storage/dataframes
@ -197,8 +197,8 @@ tests:
$.dataframes[0].resources[0].volume: "1"
$.dataframes[0].resources[0].rating: "0.121"
$.dataframes[0].resources[0].service: "image.size"
$.dataframes[0].resources[0].desc.dummy: true
$.dataframes[0].resources[0].desc.fake_meta: 1.0
$.dataframes[0].resources[0].desc.dummy: 'True'
$.dataframes[0].resources[0].desc.fake_meta: '1.0'
- name: fetch data filtering on service with no data and tenant
url: /v1/storage/dataframes

View File

@ -19,6 +19,7 @@ import decimal
from oslo_utils import uuidutils
from cloudkitty import dataframe
from cloudkitty import utils as ck_utils
# These have a different format in order to check that both forms are supported
@ -40,21 +41,24 @@ COMPUTE_METADATA = {
'flavor': 'm1.nano',
'image_id': 'f5600101-8fa2-4864-899e-ebcb7ed6b568',
'instance_id': '26c084e1-b8f1-4cbc-a7ec-e8b356788a17',
'id': '1558f911-b55a-4fd2-9173-c8f1f23e5639',
'resource_id': '1558f911-b55a-4fd2-9173-c8f1f23e5639',
'memory': '64',
'metadata': {
'farm': 'prod'
},
'name': 'prod1',
'vcpus': '1'
}
COMPUTE_GROUPBY = {
'id': '1558f911-b55a-4fd2-9173-c8f1f23e5639',
'project_id': 'f266f30b11f246b589fd266f85eeec39',
'user_id': '55b3379b949243009ee96972fbf51ed1',
'vcpus': '1'}
}
IMAGE_METADATA = {
'checksum': '836c69cbcd1dc4f225daedbab6edc7c7',
'resource_id': '7b5b73f2-9181-4307-a710-b1aa6472526d',
'id': '7b5b73f2-9181-4307-a710-b1aa6472526d',
'container_format': 'aki',
'created_at': '2014-06-04T16:26:01',
'deleted': 'False',
@ -67,48 +71,43 @@ IMAGE_METADATA = {
'protected': 'False',
'size': '4969360',
'status': 'active',
'updated_at': '2014-06-04T16:26:02'}
'updated_at': '2014-06-04T16:26:02',
}
IMAGE_GROUPBY = {
'id': '7b5b73f2-9181-4307-a710-b1aa6472526d',
}
FIRST_PERIOD = {
'begin': FIRST_PERIOD_BEGIN,
'end': FIRST_PERIOD_END}
'end': FIRST_PERIOD_END,
}
SECOND_PERIOD = {
'begin': SECOND_PERIOD_BEGIN,
'end': SECOND_PERIOD_END}
'end': SECOND_PERIOD_END,
}
COLLECTED_DATA = [
dataframe.DataFrame(start=FIRST_PERIOD["begin"],
end=FIRST_PERIOD["end"]),
dataframe.DataFrame(start=SECOND_PERIOD["begin"],
end=SECOND_PERIOD["end"]),
]
_INSTANCE_POINT = dataframe.DataPoint(
'instance', '1.0', '0.42', COMPUTE_GROUPBY, COMPUTE_METADATA)
_IMAGE_SIZE_POINT = dataframe.DataPoint(
'image', '1.0', '0.1337', IMAGE_GROUPBY, IMAGE_METADATA)
COLLECTED_DATA[0].add_point(_INSTANCE_POINT, 'instance')
COLLECTED_DATA[0].add_point(_IMAGE_SIZE_POINT, 'image.size')
COLLECTED_DATA[1].add_point(_INSTANCE_POINT, 'instance')
COLLECTED_DATA = [{
'period': FIRST_PERIOD,
'usage': {
'instance': [{
'desc': COMPUTE_METADATA,
'vol': {
'qty': decimal.Decimal(1.0),
'unit': 'instance'}}],
'image.size': [{
'desc': IMAGE_METADATA,
'vol': {
'qty': decimal.Decimal(1.0),
'unit': 'image'}}]
}}, {
'period': SECOND_PERIOD,
'usage': {
'instance': [{
'desc': COMPUTE_METADATA,
'vol': {
'qty': decimal.Decimal(1.0),
'unit': 'instance'}}]
},
}]
RATED_DATA = copy.deepcopy(COLLECTED_DATA)
RATED_DATA[0]['usage']['instance'][0]['rating'] = {
'price': decimal.Decimal('0.42')}
RATED_DATA[0]['usage']['image.size'][0]['rating'] = {
'price': decimal.Decimal('0.1337')}
RATED_DATA[1]['usage']['instance'][0]['rating'] = {
'price': decimal.Decimal('0.42')}
DEFAULT_METRICS_CONF = {
"metrics": {
@ -221,33 +220,6 @@ DEFAULT_METRICS_CONF = {
}
def split_storage_data(raw_data):
final_data = []
for frame in raw_data:
frame['period']['begin'] = ck_utils.dt2iso(frame['period']['begin'])
frame['period']['end'] = ck_utils.dt2iso(frame['period']['end'])
usage_buffer = frame.pop('usage')
# Sort to have a consistent result as we are converting it to a list
for service, data in sorted(usage_buffer.items()):
new_frame = copy.deepcopy(frame)
new_frame['usage'] = {service: data}
new_frame['usage'][service][0]['tenant_id'] = TENANT
final_data.append(new_frame)
return final_data
# FIXME(sheeprine): storage is not using decimal for rates, we need to
# transition to decimal.
STORED_DATA = copy.deepcopy(COLLECTED_DATA)
STORED_DATA[0]['usage']['instance'][0]['rating'] = {
'price': 0.42}
STORED_DATA[0]['usage']['image.size'][0]['rating'] = {
'price': 0.1337}
STORED_DATA[1]['usage']['instance'][0]['rating'] = {
'price': 0.42}
STORED_DATA = split_storage_data(STORED_DATA)
METRICS_CONF = DEFAULT_METRICS_CONF
@ -306,7 +278,7 @@ V2_STORAGE_SAMPLE = {
},
"groupby": {
"id": uuidutils.generate_uuid(),
"project_id": COMPUTE_METADATA['project_id'],
"project_id": COMPUTE_GROUPBY['project_id'],
},
"metadata": {
"flavor": "m1.nano",
@ -323,7 +295,7 @@ V2_STORAGE_SAMPLE = {
},
"groupby": {
"id": uuidutils.generate_uuid(),
"project_id": COMPUTE_METADATA['project_id'],
"project_id": COMPUTE_GROUPBY['project_id'],
},
"metadata": {
"disk_format": "qcow2",
@ -339,7 +311,7 @@ V2_STORAGE_SAMPLE = {
},
"groupby": {
"id": uuidutils.generate_uuid(),
"project_id": COMPUTE_METADATA['project_id'],
"project_id": COMPUTE_GROUPBY['project_id'],
},
"metadata": {
"volume_type": "ceph-region1"
@ -355,7 +327,7 @@ V2_STORAGE_SAMPLE = {
},
"groupby": {
"id": uuidutils.generate_uuid(),
"project_id": COMPUTE_METADATA['project_id'],
"project_id": COMPUTE_GROUPBY['project_id'],
},
"metadata": {
"instance_id": uuidutils.generate_uuid(),
@ -371,7 +343,7 @@ V2_STORAGE_SAMPLE = {
},
"groupby": {
"id": uuidutils.generate_uuid(),
"project_id": COMPUTE_METADATA['project_id'],
"project_id": COMPUTE_GROUPBY['project_id'],
},
"metadata": {
"instance_id": uuidutils.generate_uuid(),
@ -387,7 +359,7 @@ V2_STORAGE_SAMPLE = {
},
"groupby": {
"id": uuidutils.generate_uuid(),
"project_id": COMPUTE_METADATA['project_id'],
"project_id": COMPUTE_GROUPBY['project_id'],
},
"metadata": {
"state": "attached",
@ -403,7 +375,7 @@ V2_STORAGE_SAMPLE = {
},
"groupby": {
"id": uuidutils.generate_uuid(),
"project_id": COMPUTE_METADATA['project_id'],
"project_id": COMPUTE_GROUPBY['project_id'],
},
"metadata": {
"object_id": uuidutils.generate_uuid(),

View File

@ -18,6 +18,7 @@ import unittest
import mock
from cloudkitty import dataframe
from cloudkitty.storage.v2 import influx
from cloudkitty.tests import TestCase
@ -40,24 +41,28 @@ class TestInfluxDBStorage(TestCase):
def test_point_to_dataframe_entry_valid_point(self):
self.assertEqual(
influx.InfluxStorage._point_to_dataframe_entry(self.point), {
'vol': {'unit': 'banana', 'qty': 42},
'rating': {'price': 1.0},
'groupby': {'one': '1', 'two': '2'},
'metadata': {'1': 'one', '2': 'two'},
}
influx.InfluxStorage._point_to_dataframe_entry(self.point),
dataframe.DataPoint(
'banana',
42,
1,
{'one': '1', 'two': '2'},
{'1': 'one', '2': 'two'},
),
)
def test_point_to_dataframe_entry_invalid_groupby_metadata(self):
self.point['groupby'] = 'a'
self.point['metadata'] = None
self.assertEqual(
influx.InfluxStorage._point_to_dataframe_entry(self.point), {
'vol': {'unit': 'banana', 'qty': 42},
'rating': {'price': 1.0},
'groupby': {'a': ''},
'metadata': {}
}
influx.InfluxStorage._point_to_dataframe_entry(self.point),
dataframe.DataPoint(
'banana',
42,
1,
{'a': ''},
{},
),
)

View File

@ -70,15 +70,15 @@ class StorageUnitTest(TestCase):
total = 0
qty = 0
length = 0
for data_part in data:
for mtype, usage_part in data_part['usage'].items():
for dataframe in data:
for mtype, points in dataframe.itertypes():
if types is not None and mtype not in types:
continue
for item in usage_part:
for point in points:
if project_id is None or \
project_id == item['groupby']['project_id']:
total += item['rating']['price']
qty += item['vol']['qty']
project_id == point.groupby['project_id']:
total += point.price
qty += point.qty
length += 1
return round(float(total), 5), round(float(qty), 5), length
@ -274,10 +274,8 @@ class StorageUnitTest(TestCase):
frames = self.storage.retrieve(begin=begin, end=end)
self.assertEqual(frames['total'], expected_length)
retrieved_length = 0
for data_part in frames['dataframes']:
for usage_part in data_part['usage'].values():
retrieved_length += len(usage_part)
retrieved_length = sum(len(list(frame.iterpoints()))
for frame in frames['dataframes'])
self.assertEqual(expected_length, retrieved_length)
@ -292,10 +290,8 @@ class StorageUnitTest(TestCase):
metric_types=['image.size'])
self.assertEqual(frames['total'], expected_length)
retrieved_length = 0
for data_part in frames['dataframes']:
for usage_part in data_part['usage'].values():
retrieved_length += len(usage_part)
retrieved_length = sum(len(list(frame.iterpoints()))
for frame in frames['dataframes'])
self.assertEqual(expected_length, retrieved_length)
@ -313,10 +309,8 @@ class StorageUnitTest(TestCase):
metric_types=['image.size', 'instance'])
self.assertEqual(frames['total'], expected_length)
retrieved_length = 0
for data_part in frames['dataframes']:
for usage_part in data_part['usage'].values():
retrieved_length += len(usage_part)
retrieved_length = sum(len(list(frame.iterpoints()))
for frame in frames['dataframes'])
self.assertEqual(expected_length, retrieved_length)

View File

@ -0,0 +1,274 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import datetime
import decimal
import unittest
from dateutil import tz
from werkzeug import datastructures
from cloudkitty import dataframe
from cloudkitty import json_utils as json
class TestDataPoint(unittest.TestCase):
default_params = {
'qty': 0,
'price': 0,
'unit': None,
'groupby': {},
'metadata': {},
}
def test_create_empty_datapoint(self):
point = dataframe.DataPoint(**self.default_params)
self.assertEqual(point.qty, decimal.Decimal(0))
self.assertEqual(point.price, decimal.Decimal(0))
self.assertEqual(point.unit, "undefined")
self.assertEqual(point.groupby, {})
def test_readonly_attrs(self):
point = dataframe.DataPoint(**self.default_params)
for attr in ("qty", "price", "unit"):
self.assertRaises(AttributeError, setattr, point, attr, 'x')
def test_properties(self):
params = copy.deepcopy(self.default_params)
groupby = {"group_one": "one", "group_two": "two"}
metadata = {"meta_one": "one", "meta_two": "two"}
params.update({'groupby': groupby, 'metadata': metadata})
point = dataframe.DataPoint(**params)
self.assertEqual(point.groupby, groupby)
self.assertEqual(point.metadata, metadata)
def test_as_dict_mutable_standard(self):
self.assertEqual(
dataframe.DataPoint(
**self.default_params).as_dict(mutable=True),
{
"vol": {"unit": "undefined", "qty": decimal.Decimal(0)},
"rating": {"price": decimal.Decimal(0)},
"groupby": {},
"metadata": {},
}
)
def test_as_dict_mutable_legacy(self):
self.assertEqual(
dataframe.DataPoint(**self.default_params).as_dict(
legacy=True, mutable=True),
{
"vol": {"unit": "undefined", "qty": decimal.Decimal(0)},
"rating": {"price": decimal.Decimal(0)},
"desc": {},
}
)
def test_as_dict_immutable(self):
point_dict = dataframe.DataPoint(**self.default_params).as_dict()
self.assertIsInstance(point_dict, datastructures.ImmutableDict)
self.assertEqual(dict(point_dict), {
"vol": {"unit": "undefined", "qty": decimal.Decimal(0)},
"rating": {"price": decimal.Decimal(0)},
"groupby": {},
"metadata": {},
})
def test_json_standard(self):
self.assertEqual(
json.loads(dataframe.DataPoint(**self.default_params).json()), {
"vol": {"unit": "undefined", "qty": decimal.Decimal(0)},
"rating": {"price": decimal.Decimal(0)},
"groupby": {},
"metadata": {},
}
)
def test_json_legacy(self):
self.assertEqual(
json.loads(dataframe.DataPoint(
**self.default_params).json(legacy=True)),
{
"vol": {"unit": "undefined", "qty": decimal.Decimal(0)},
"rating": {"price": decimal.Decimal(0)},
"desc": {},
}
)
def test_from_dict_valid_dict(self):
self.assertEqual(
dataframe.DataPoint(
unit="amazing_unit",
qty=3,
price=0,
groupby={"g_one": "one", "g_two": "two"},
metadata={"m_one": "one", "m_two": "two"},
).as_dict(),
dataframe.DataPoint.from_dict({
"vol": {"unit": "amazing_unit", "qty": 3},
"groupby": {"g_one": "one", "g_two": "two"},
"metadata": {"m_one": "one", "m_two": "two"},
}).as_dict(),
)
def test_from_dict_invalid(self):
invalid = {
"vol": {},
"desc": {"a": "b"},
}
self.assertRaises(ValueError, dataframe.DataPoint.from_dict, invalid)
def test_set_price(self):
point = dataframe.DataPoint(**self.default_params)
self.assertEqual(point.price, decimal.Decimal(0))
self.assertEqual(point.set_price(42).price, decimal.Decimal(42))
self.assertEqual(point.set_price(1337).price, decimal.Decimal(1337))
def test_desc(self):
params = copy.deepcopy(self.default_params)
params['groupby'] = {'group_one': 'one', 'group_two': 'two'}
params['metadata'] = {'meta_one': 'one', 'meta_two': 'two'}
point = dataframe.DataPoint(**params)
self.assertEqual(point.desc, {
'group_one': 'one',
'group_two': 'two',
'meta_one': 'one',
'meta_two': 'two',
})
class TestDataFrame(unittest.TestCase):
def test_dataframe_add_points(self):
start = datetime.datetime(2019, 3, 4, 1, tzinfo=tz.UTC)
end = datetime.datetime(2019, 3, 4, 2, tzinfo=tz.UTC)
df = dataframe.DataFrame(start=start, end=end)
a_points = [dataframe.DataPoint(**TestDataPoint.default_params)
for _ in range(2)]
b_points = [dataframe.DataPoint(**TestDataPoint.default_params)
for _ in range(4)]
df.add_point(a_points[0], 'service_a')
df.add_points(a_points[1:], 'service_a')
df.add_points(b_points[:2], 'service_b')
df.add_points(b_points[2:3], 'service_b')
df.add_point(b_points[3], 'service_b')
self.assertEqual(dict(df.as_dict()), {
'period': {'begin': start, 'end': end},
'usage': {
'service_a': [
dataframe.DataPoint(
**TestDataPoint.default_params).as_dict()
for _ in range(2)],
'service_b': [
dataframe.DataPoint(
**TestDataPoint.default_params).as_dict()
for _ in range(4)],
}
})
def test_properties(self):
start = datetime.datetime(2019, 6, 1, tzinfo=tz.UTC)
end = datetime.datetime(2019, 6, 1, 1, tzinfo=tz.UTC)
df = dataframe.DataFrame(start=start, end=end)
self.assertEqual(df.start, start)
self.assertEqual(df.end, end)
def test_json(self):
start = datetime.datetime(2019, 3, 4, 1, tzinfo=tz.UTC)
end = datetime.datetime(2019, 3, 4, 2, tzinfo=tz.UTC)
df = dataframe.DataFrame(start=start, end=end)
a_points = [dataframe.DataPoint(**TestDataPoint.default_params)
for _ in range(2)]
b_points = [dataframe.DataPoint(**TestDataPoint.default_params)
for _ in range(4)]
df.add_points(a_points, 'service_a')
df.add_points(b_points, 'service_b')
self.maxDiff = None
self.assertEqual(json.loads(df.json()), json.loads(json.dumps({
'period': {'begin': start.isoformat(),
'end': end.isoformat()},
'usage': {
'service_a': [
dataframe.DataPoint(
**TestDataPoint.default_params).as_dict()
for _ in range(2)],
'service_b': [
dataframe.DataPoint(
**TestDataPoint.default_params).as_dict()
for _ in range(4)],
}
})))
def test_from_dict_valid_dict(self):
start = datetime.datetime(2019, 1, 2, 12, tzinfo=tz.UTC)
end = datetime.datetime(2019, 1, 2, 13, tzinfo=tz.UTC)
point = dataframe.DataPoint(
'unit', 0, 0, {'g_one': 'one'}, {'m_two': 'two'})
usage = {'metric_x': [point]}
dict_usage = {'metric_x': [point.as_dict(mutable=True)]}
self.assertEqual(
dataframe.DataFrame(start, end, usage).as_dict(),
dataframe.DataFrame.from_dict({
'period': {'begin': start, 'end': end},
'usage': dict_usage,
}).as_dict(),
)
def test_from_dict_valid_dict_date_as_str(self):
start = datetime.datetime(2019, 1, 2, 12, tzinfo=tz.UTC)
end = datetime.datetime(2019, 1, 2, 13, tzinfo=tz.UTC)
point = dataframe.DataPoint(
'unit', 0, 0, {'g_one': 'one'}, {'m_two': 'two'})
usage = {'metric_x': [point]}
dict_usage = {'metric_x': [point.as_dict(mutable=True)]}
self.assertEqual(
dataframe.DataFrame(start, end, usage).as_dict(),
dataframe.DataFrame.from_dict({
'period': {'begin': start.isoformat(), 'end': end.isoformat()},
'usage': dict_usage,
}).as_dict(),
)
def test_from_dict_invalid_dict(self):
self.assertRaises(
ValueError, dataframe.DataFrame.from_dict, {'usage': None})
def test_repr(self):
start = datetime.datetime(2019, 3, 4, 1, tzinfo=tz.UTC)
end = datetime.datetime(2019, 3, 4, 2, tzinfo=tz.UTC)
df = dataframe.DataFrame(start=start, end=end)
points = [dataframe.DataPoint(**TestDataPoint.default_params)
for _ in range(4)]
df.add_points(points, 'metric_x')
self.assertEqual(str(df), "DataFrame(metrics=[metric_x])")
df.add_points(points, 'metric_y')
self.assertEqual(str(df), "DataFrame(metrics=[metric_x,metric_y])")
def test_iterpoints(self):
start = datetime.datetime(2019, 3, 4, 1, tzinfo=tz.UTC)
end = datetime.datetime(2019, 3, 4, 2, tzinfo=tz.UTC)
df = dataframe.DataFrame(start=start, end=end)
points = [dataframe.DataPoint(**TestDataPoint.default_params)
for _ in range(4)]
df.add_points(points, 'metric_x')
expected = [
('metric_x', dataframe.DataPoint(**TestDataPoint.default_params))
for _ in range(4)]
self.assertEqual(list(df.iterpoints()), expected)

View File

@ -14,11 +14,13 @@
# under the License.
#
import copy
import datetime
import decimal
import mock
from oslo_utils import uuidutils
from cloudkitty import dataframe
from cloudkitty.rating import hash
from cloudkitty.rating.hash.db import api
from cloudkitty import tests
@ -26,10 +28,10 @@ from cloudkitty import tests
TEST_TS = 1388577600
FAKE_UUID = '6c1b8a30-797f-4b7e-ad66-9879b79059fb'
CK_RESOURCES_DATA = [{
CK_RESOURCES_DATA = [dataframe.DataFrame.from_dict({
"period": {
"begin": "2014-10-01T00:00:00",
"end": "2014-10-01T01:00:00"},
"begin": datetime.datetime(2014, 10, 1),
"end": datetime.datetime(2014, 10, 1, 1)},
"usage": {
"compute": [
{
@ -94,7 +96,7 @@ CK_RESOURCES_DATA = [{
"vcpus": "1"},
"vol": {
"qty": 1,
"unit": "instance"}}]}}]
"unit": "instance"}}]}}, legacy=True)]
class HashMapRatingTest(tests.TestCase):
@ -859,21 +861,24 @@ class HashMapRatingTest(tests.TestCase):
map_type='flat',
service_id=service_db.service_id)
self._hash.reload_config()
actual_data = copy.deepcopy(CK_RESOURCES_DATA)
expected_data = copy.deepcopy(CK_RESOURCES_DATA)
for cur_data in actual_data:
cur_usage = cur_data['usage']
for service_name, service_data in cur_usage.items():
for item in service_data:
self._hash._res = {}
self._hash.process_services(service_name, item)
self._hash.add_rating_informations(item)
compute_list = expected_data[0]['usage']['compute']
actual_data = dataframe.DataFrame(start=expected_data[0].start,
end=expected_data[0].end)
for cur_data in expected_data:
for service_name, point in cur_data.iterpoints():
self._hash._res = {}
self._hash.process_services(service_name, point)
actual_data.add_point(
self._hash.add_rating_informations(point), service_name)
actual_data = [actual_data]
df_dicts = [d.as_dict(mutable=True) for d in expected_data]
compute_list = df_dicts[0]['usage']['compute']
compute_list[0]['rating'] = {'price': decimal.Decimal('2.757')}
compute_list[1]['rating'] = {'price': decimal.Decimal('5.514')}
compute_list[2]['rating'] = {'price': decimal.Decimal('5.514')}
compute_list[3]['rating'] = {'price': decimal.Decimal('2.757')}
self.assertEqual(expected_data, actual_data)
self.assertEqual(df_dicts, [d.as_dict(mutable=True)
for d in actual_data])
def test_process_fields(self):
service_db = self._db_api.create_service('compute')
@ -900,21 +905,24 @@ class HashMapRatingTest(tests.TestCase):
map_type='flat',
field_id=flavor_field.field_id)
self._hash.reload_config()
actual_data = copy.deepcopy(CK_RESOURCES_DATA)
expected_data = copy.deepcopy(CK_RESOURCES_DATA)
for cur_data in actual_data:
cur_usage = cur_data['usage']
for service_name, service_data in cur_usage.items():
for item in service_data:
self._hash._res = {}
self._hash.process_fields(service_name, item)
self._hash.add_rating_informations(item)
compute_list = expected_data[0]['usage']['compute']
actual_data = dataframe.DataFrame(start=expected_data[0].start,
end=expected_data[0].end)
for cur_data in expected_data:
for service_name, point in cur_data.iterpoints():
self._hash._res = {}
self._hash.process_fields(service_name, point)
actual_data.add_point(
self._hash.add_rating_informations(point), service_name)
actual_data = [actual_data]
df_dicts = [d.as_dict(mutable=True) for d in expected_data]
compute_list = df_dicts[0]['usage']['compute']
compute_list[0]['rating'] = {'price': decimal.Decimal('1.337')}
compute_list[1]['rating'] = {'price': decimal.Decimal('2.84')}
compute_list[2]['rating'] = {'price': decimal.Decimal('0')}
compute_list[3]['rating'] = {'price': decimal.Decimal('1.47070')}
self.assertEqual(expected_data, actual_data)
self.assertEqual(df_dicts, [d.as_dict(mutable=True)
for d in actual_data])
def test_process_fields_no_match(self):
service_db = self._db_api.create_service('compute')
@ -926,18 +934,19 @@ class HashMapRatingTest(tests.TestCase):
map_type='flat',
field_id=flavor_field.field_id)
self._hash.reload_config()
actual_data = copy.deepcopy(CK_RESOURCES_DATA)
expected_data = copy.deepcopy(CK_RESOURCES_DATA)
for cur_data in actual_data:
cur_usage = cur_data['usage']
for service_name, service_data in cur_usage.items():
for item in service_data:
self._hash._res = {}
self._hash.process_fields(service_name, item)
self._hash.add_rating_informations(item)
for elem in expected_data[0]['usage']['compute']:
elem['rating'] = {'price': decimal.Decimal('0')}
self.assertEqual(expected_data, actual_data)
actual_data = dataframe.DataFrame(start=expected_data[0].start,
end=expected_data[0].end)
for cur_data in expected_data:
for service_name, point in cur_data.iterpoints():
self._hash._res = {}
self._hash.process_fields(service_name, point)
actual_data.add_point(
self._hash.add_rating_informations(point), service_name)
actual_data = [actual_data]
df_dicts = [d.as_dict(mutable=True) for d in expected_data]
self.assertEqual(df_dicts, [d.as_dict(mutable=True)
for d in actual_data])
def test_process_field_threshold(self):
service_db = self._db_api.create_service('compute')
@ -954,21 +963,24 @@ class HashMapRatingTest(tests.TestCase):
map_type='flat',
field_id=field_db.field_id)
self._hash.reload_config()
actual_data = copy.deepcopy(CK_RESOURCES_DATA)
expected_data = copy.deepcopy(CK_RESOURCES_DATA)
for cur_data in actual_data:
cur_usage = cur_data['usage']
for service_name, service_data in cur_usage.items():
for item in service_data:
self._hash._res = {}
self._hash.process_fields(service_name, item)
self._hash.add_rating_informations(item)
compute_list = expected_data[0]['usage']['compute']
actual_data = dataframe.DataFrame(start=expected_data[0].start,
end=expected_data[0].end)
for cur_data in expected_data:
for service_name, point in cur_data.iterpoints():
self._hash._res = {}
self._hash.process_fields(service_name, point)
actual_data.add_point(
self._hash.add_rating_informations(point), service_name)
actual_data = [actual_data]
df_dicts = [d.as_dict(mutable=True) for d in expected_data]
compute_list = df_dicts[0]['usage']['compute']
compute_list[0]['rating'] = {'price': decimal.Decimal('0.1337')}
compute_list[1]['rating'] = {'price': decimal.Decimal('0.4')}
compute_list[2]['rating'] = {'price': decimal.Decimal('0.4')}
compute_list[3]['rating'] = {'price': decimal.Decimal('0.1337')}
self.assertEqual(expected_data, actual_data)
self.assertEqual(df_dicts, [d.as_dict(mutable=True)
for d in actual_data])
def test_process_field_threshold_no_match(self):
service_db = self._db_api.create_service('compute')
@ -980,18 +992,18 @@ class HashMapRatingTest(tests.TestCase):
map_type='flat',
field_id=field_db.field_id)
self._hash.reload_config()
actual_data = copy.deepcopy(CK_RESOURCES_DATA)
expected_data = copy.deepcopy(CK_RESOURCES_DATA)
for cur_data in actual_data:
cur_usage = cur_data['usage']
for service_name, service_data in cur_usage.items():
for item in service_data:
self._hash._res = {}
self._hash.process_fields(service_name, item)
self._hash.add_rating_informations(item)
for elem in expected_data[0]['usage']['compute']:
elem['rating'] = {'price': decimal.Decimal('0')}
self.assertEqual(expected_data, actual_data)
actual_data = dataframe.DataFrame(start=expected_data[0].start,
end=expected_data[0].end)
for cur_data in expected_data:
for service_name, point in cur_data.iterpoints():
self._hash._res = {}
self._hash.process_fields(service_name, point)
actual_data.add_point(
self._hash.add_rating_informations(point), service_name)
actual_data = [actual_data]
self.assertEqual([d.as_dict(mutable=True) for d in expected_data],
[d.as_dict(mutable=True) for d in actual_data])
def test_process_service_threshold(self):
service_db = self._db_api.create_service('compute')
@ -1006,21 +1018,24 @@ class HashMapRatingTest(tests.TestCase):
map_type='flat',
service_id=service_db.service_id)
self._hash.reload_config()
actual_data = copy.deepcopy(CK_RESOURCES_DATA)
expected_data = copy.deepcopy(CK_RESOURCES_DATA)
for cur_data in actual_data:
cur_usage = cur_data['usage']
for service_name, service_data in cur_usage.items():
for item in service_data:
self._hash._res = {}
self._hash.process_services(service_name, item)
self._hash.add_rating_informations(item)
compute_list = expected_data[0]['usage']['compute']
actual_data = dataframe.DataFrame(start=expected_data[0].start,
end=expected_data[0].end)
for cur_data in expected_data:
for service_name, point in cur_data.iterpoints():
self._hash._res = {}
self._hash.process_services(service_name, point)
actual_data.add_point(
self._hash.add_rating_informations(point), service_name)
actual_data = [actual_data]
df_dicts = [d.as_dict(mutable=True) for d in expected_data]
compute_list = df_dicts[0]['usage']['compute']
compute_list[0]['rating'] = {'price': decimal.Decimal('0.1')}
compute_list[1]['rating'] = {'price': decimal.Decimal('0.15')}
compute_list[2]['rating'] = {'price': decimal.Decimal('0.15')}
compute_list[3]['rating'] = {'price': decimal.Decimal('0.1')}
self.assertEqual(expected_data, actual_data)
self.assertEqual(df_dicts, [d.as_dict(mutable=True)
for d in actual_data])
def test_update_result_flat(self):
self._hash.update_result(
@ -1155,13 +1170,16 @@ class HashMapRatingTest(tests.TestCase):
field_id=memory_db.field_id,
group_id=group_db.group_id)
self._hash.reload_config()
actual_data = copy.deepcopy(CK_RESOURCES_DATA)
expected_data = copy.deepcopy(CK_RESOURCES_DATA)
compute_list = expected_data[0]['usage']['compute']
actual_data = dataframe.DataFrame(start=expected_data[0].start,
end=expected_data[0].end)
df_dicts = [d.as_dict(mutable=True) for d in expected_data]
compute_list = df_dicts[0]['usage']['compute']
compute_list[0]['rating'] = {'price': decimal.Decimal('2.487')}
compute_list[1]['rating'] = {'price': decimal.Decimal('5.564')}
# 8vcpu mapping * 2 + service_mapping * 1 + 128m ram threshold * 2
compute_list[2]['rating'] = {'price': decimal.Decimal('34.40')}
compute_list[3]['rating'] = {'price': decimal.Decimal('2.6357')}
self._hash.process(actual_data)
self.assertEqual(expected_data, actual_data)
actual_data = [self._hash.process(d) for d in expected_data]
self.assertEqual(df_dicts, [d.as_dict(mutable=True)
for d in actual_data])

View File

@ -182,31 +182,31 @@ class WorkerTest(tests.TestCase):
self.worker._collect = mock.MagicMock()
def test_do_collection_all_valid(self):
side_effect = [
metrics = ['metric{}'.format(i) for i in range(5)]
side_effect = [(
metrics[i],
{'period': {'begin': 0,
'end': 3600},
'usage': [i]}
for i in range(5)
]
'usage': i},
) for i in range(5)]
self.worker._collect.side_effect = side_effect
metrics = ['metric{}'.format(i) for i in range(5)]
output = sorted(self.worker._do_collection(metrics, 0),
key=lambda x: x['usage'][0])
output = sorted(self.worker._do_collection(metrics, 0).items(),
key=lambda x: x[1]['usage'])
self.assertEqual(side_effect, output)
def test_do_collection_some_empty(self):
side_effect = [
metrics = ['metric{}'.format(i) for i in range(7)]
side_effect = [(
metrics[i],
{'period': {'begin': 0,
'end': 3600},
'usage': [i]}
for i in range(5)
]
'usage': i},
) for i in range(5)]
side_effect.insert(2, collector.NoDataCollected('a', 'b'))
side_effect.insert(4, collector.NoDataCollected('a', 'b'))
self.worker._collect.side_effect = side_effect
metrics = ['metric{}'.format(i) for i in range(7)]
output = sorted(self.worker._do_collection(metrics, 0),
key=lambda x: x['usage'][0])
output = sorted(self.worker._do_collection(metrics, 0).items(),
key=lambda x: x[1]['usage'])
self.assertEqual([
i for i in side_effect
if not isinstance(i, collector.NoDataCollected)

View File

@ -18,6 +18,7 @@ import random
from oslo_utils import uuidutils
from cloudkitty import dataframe
from cloudkitty.tests import samples
@ -32,25 +33,25 @@ def generate_v2_storage_data(min_length=10,
elif not isinstance(project_ids, list):
project_ids = [project_ids]
usage = {}
df = dataframe.DataFrame(start=start, end=end)
for metric_name, sample in samples.V2_STORAGE_SAMPLE.items():
dataframes = []
datapoints = []
for project_id in project_ids:
data = [copy.deepcopy(sample)
for i in range(min_length + random.randint(1, 10))]
for elem in data:
elem['groupby']['id'] = uuidutils.generate_uuid()
elem['groupby']['project_id'] = project_id
dataframes += data
usage[metric_name] = dataframes
datapoints += [dataframe.DataPoint(
elem['vol']['unit'],
elem['vol']['qty'],
elem['rating']['price'],
elem['groupby'],
elem['metadata'],
) for elem in data]
df.add_points(datapoints, metric_name)
return {
'usage': usage,
'period': {
'begin': start,
'end': end
}
}
return df
def load_conf(*args):

View File

@ -16,6 +16,7 @@
from oslo_log import log
from cloudkitty import dataframe
from cloudkitty import transformer
@ -24,15 +25,16 @@ LOG = log.getLogger(__name__)
class CloudKittyFormatTransformer(transformer.BaseTransformer):
def format_item(self, groupby, metadata, unit, qty=1.0):
data = {}
data['groupby'] = groupby
data['metadata'] = metadata
# For backward compatibility.
data['desc'] = data['groupby'].copy()
data['desc'].update(data['metadata'])
data['vol'] = {'unit': unit, 'qty': qty}
# data = {}
# data['groupby'] = groupby
# data['metadata'] = metadata
# # For backward compatibility.
# data['desc'] = data['groupby'].copy()
# data['desc'].update(data['metadata'])
# data['vol'] = {'unit': unit, 'qty': qty}
return data
return dataframe.DataPoint(unit, qty, 0, groupby, metadata)
# return data
def format_service(self, service, items):
data = {}

View File

@ -0,0 +1,5 @@
---
other:
- |
Data frames/points are now internally represented as objects rather than
dicts.