Remove usage of unix timestamps
This removes usage of unix timestamps in the codebase. They're replaced by datetime objects. This is part of a larger effort aiming at making cloudkitty timezone-aware. Timezone information will be added to the datetime objects in another patch. Change-Id: I7aadc314ceb5e36277bb51f55065ef558f10a7f2 Story: 2005319 Task: 30235
This commit is contained in:
parent
16b6fa5ce8
commit
0c1546d4aa
@ -25,7 +25,6 @@ import wsmeext.pecan as wsme_pecan
|
||||
from cloudkitty.api.v1.datamodels import storage as storage_models
|
||||
from cloudkitty.common import policy
|
||||
from cloudkitty import storage
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -62,10 +61,6 @@ class DataFramesController(rest.RestController):
|
||||
dataframes = []
|
||||
filters = {scope_key: tenant_id} if tenant_id else None
|
||||
|
||||
if begin:
|
||||
begin = ck_utils.dt2ts(begin)
|
||||
if end:
|
||||
end = ck_utils.dt2ts(end)
|
||||
try:
|
||||
resp = backend.retrieve(
|
||||
begin, end,
|
||||
@ -95,8 +90,8 @@ class DataFramesController(rest.RestController):
|
||||
frame_tenant = desc[scope_key]
|
||||
resources.append(resource)
|
||||
dataframe = storage_models.DataFrame(
|
||||
begin=ck_utils.iso2dt(frame['period']['begin']),
|
||||
end=ck_utils.iso2dt(frame['period']['end']),
|
||||
begin=frame['period']['begin'],
|
||||
end=frame['period']['end'],
|
||||
tenant_id=frame_tenant,
|
||||
resources=resources)
|
||||
dataframes.append(dataframe)
|
||||
|
@ -78,7 +78,7 @@ class ScopeState(base.BaseResource):
|
||||
'scope_key': r.scope_key,
|
||||
'fetcher': r.fetcher,
|
||||
'collector': r.collector,
|
||||
'state': str(r.state),
|
||||
'state': r.state.isoformat(),
|
||||
} for r in results]
|
||||
}
|
||||
|
||||
|
@ -222,19 +222,6 @@ class BaseCollector(object):
|
||||
|
||||
return output
|
||||
|
||||
@staticmethod
|
||||
def last_month():
|
||||
month_start = ck_utils.get_month_start()
|
||||
month_end = ck_utils.get_month_end()
|
||||
start_ts = ck_utils.dt2ts(month_start)
|
||||
end_ts = ck_utils.dt2ts(month_end)
|
||||
return start_ts, end_ts
|
||||
|
||||
@staticmethod
|
||||
def current_month():
|
||||
month_start = ck_utils.get_month_start()
|
||||
return ck_utils.dt2ts(month_start)
|
||||
|
||||
@classmethod
|
||||
def _res_to_func(cls, resource_name):
|
||||
trans_resource = 'get_'
|
||||
@ -262,8 +249,18 @@ class BaseCollector(object):
|
||||
|
||||
Returns a list of items formatted with
|
||||
``CloudKittyFormatTransformer.format_item``.
|
||||
|
||||
:param metric_name: Name of the metric to fetch
|
||||
:type metric_name: str
|
||||
:param start: start of the period
|
||||
:type start: datetime.datetime
|
||||
:param end: end of the period
|
||||
:type end: datetime.datetime
|
||||
:param project_id: ID of the scope for which data should be collected
|
||||
:type project_id: str
|
||||
:param q_filter: Optional filters
|
||||
:type q_filter: dict
|
||||
"""
|
||||
pass
|
||||
|
||||
def retrieve(self, metric_name, start, end,
|
||||
project_id=None, q_filter=None):
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
from datetime import timedelta
|
||||
import six
|
||||
|
||||
from gnocchiclient import auth as gauth
|
||||
@ -248,8 +249,8 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
# FIXME(peschk_l): In order not to miss any resource whose metrics may
|
||||
# contain measures after its destruction, we scan resources over three
|
||||
# collect periods.
|
||||
start -= CONF.collect.period
|
||||
end += CONF.collect.period
|
||||
start -= timedelta(seconds=CONF.collect.period)
|
||||
end += timedelta(seconds=CONF.collect.period)
|
||||
query_parameters = self._generate_time_filter(start, end)
|
||||
|
||||
if project_id:
|
||||
@ -316,8 +317,8 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
return self._conn.aggregates.fetch(
|
||||
op,
|
||||
resource_type=resource_type,
|
||||
start=ck_utils.ts2dt(start),
|
||||
stop=ck_utils.ts2dt(end),
|
||||
start=start,
|
||||
stop=end,
|
||||
groupby=groupby,
|
||||
search=self.extend_filter(*query_parameters))
|
||||
except (gexceptions.MetricNotFound, gexceptions.BadRequest) as e:
|
||||
@ -388,10 +389,7 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
LOG.warning(
|
||||
'[{}] An error occured during data collection '
|
||||
'between {} and {}: {}'.format(
|
||||
project_id,
|
||||
ck_utils.ts2dt(start),
|
||||
ck_utils.ts2dt(end),
|
||||
e),
|
||||
project_id, start, end, e),
|
||||
)
|
||||
continue
|
||||
data = self.t_cloudkitty.format_item(
|
||||
|
@ -175,7 +175,7 @@ class MonascaCollector(collector.BaseCollector):
|
||||
group_by = self.conf[metric_name]['groupby']
|
||||
|
||||
# NOTE(lpeschke): One aggregated measure per collect period
|
||||
period = end - start
|
||||
period = int((end - start).total_seconds())
|
||||
|
||||
extra_args = self.conf[metric_name]['extra_args']
|
||||
kwargs = {}
|
||||
@ -186,8 +186,8 @@ class MonascaCollector(collector.BaseCollector):
|
||||
name=metric_name,
|
||||
merge_metrics=True,
|
||||
dimensions=dimensions,
|
||||
start_time=ck_utils.ts2dt(start),
|
||||
end_time=ck_utils.ts2dt(end),
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
period=period,
|
||||
statistics=extra_args['aggregation_method'],
|
||||
group_by=group_by,
|
||||
@ -210,8 +210,8 @@ class MonascaCollector(collector.BaseCollector):
|
||||
metrics = self._conn.metrics.list(
|
||||
name=metric_name,
|
||||
dimensions=dimensions,
|
||||
start_time=ck_utils.ts2dt(start),
|
||||
end_time=ck_utils.ts2dt(end),
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
)
|
||||
|
||||
resource_key = self.conf[metric_name]['extra_args']['resource_key']
|
||||
|
@ -139,7 +139,7 @@ class PrometheusCollector(collector.BaseCollector):
|
||||
method = self.conf[metric_name]['extra_args']['aggregation_method']
|
||||
groupby = self.conf[metric_name].get('groupby', [])
|
||||
metadata = self.conf[metric_name].get('metadata', [])
|
||||
period = end - start
|
||||
period = int((end - start).total_seconds())
|
||||
time = end
|
||||
|
||||
query = '{0}({0}_over_time({1}{{{2}="{3}"}}[{4}s])) by ({5})'.format(
|
||||
@ -154,7 +154,7 @@ class PrometheusCollector(collector.BaseCollector):
|
||||
try:
|
||||
res = self._conn.get_instant(
|
||||
query,
|
||||
time,
|
||||
time.isoformat(),
|
||||
)
|
||||
except PrometheusResponseError as e:
|
||||
raise CollectError(*e.args)
|
||||
|
@ -13,7 +13,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
from datetime import timedelta
|
||||
import decimal
|
||||
import functools
|
||||
import hashlib
|
||||
import multiprocessing
|
||||
import random
|
||||
@ -222,6 +224,13 @@ class APIWorker(BaseWorker):
|
||||
return price
|
||||
|
||||
|
||||
def _check_state(obj, period, tenant_id):
|
||||
timestamp = obj._state.get_state(tenant_id)
|
||||
return ck_utils.check_time_state(timestamp,
|
||||
period,
|
||||
CONF.collect.wait_periods)
|
||||
|
||||
|
||||
class Worker(BaseWorker):
|
||||
def __init__(self, collector, storage, tenant_id, worker_id):
|
||||
self._collector = collector
|
||||
@ -232,11 +241,13 @@ class Worker(BaseWorker):
|
||||
self._worker_id = worker_id
|
||||
self._conf = ck_utils.load_conf(CONF.collect.metrics_conf)
|
||||
self._state = state.StateManager()
|
||||
self._check_state = functools.partial(
|
||||
_check_state, self, self._period, self._tenant_id)
|
||||
|
||||
super(Worker, self).__init__(self._tenant_id)
|
||||
|
||||
def _collect(self, metric, start_timestamp):
|
||||
next_timestamp = start_timestamp + self._period
|
||||
next_timestamp = start_timestamp + timedelta(seconds=self._period)
|
||||
|
||||
raw_data = self._collector.retrieve(
|
||||
metric,
|
||||
@ -263,7 +274,7 @@ class Worker(BaseWorker):
|
||||
scope=self._tenant_id,
|
||||
worker=self._worker_id,
|
||||
metric=metric,
|
||||
ts=ck_utils.ts2dt(timestamp))
|
||||
ts=timestamp)
|
||||
)
|
||||
return None
|
||||
except Exception as e:
|
||||
@ -273,7 +284,7 @@ class Worker(BaseWorker):
|
||||
scope=self._tenant_id,
|
||||
worker=self._worker_id,
|
||||
metric=metric,
|
||||
ts=ck_utils.ts2dt(timestamp),
|
||||
ts=timestamp,
|
||||
e=e)
|
||||
)
|
||||
# FIXME(peschk_l): here we just exit, and the
|
||||
@ -287,15 +298,9 @@ class Worker(BaseWorker):
|
||||
eventlet.GreenPool(size=CONF.orchestrator.max_greenthreads).imap(
|
||||
_get_result, metrics)))
|
||||
|
||||
def check_state(self):
|
||||
timestamp = self._state.get_state(self._tenant_id)
|
||||
return ck_utils.check_time_state(timestamp,
|
||||
self._period,
|
||||
CONF.collect.wait_periods)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
timestamp = self.check_state()
|
||||
timestamp = self._check_state()
|
||||
if not timestamp:
|
||||
break
|
||||
|
||||
@ -340,6 +345,8 @@ class Orchestrator(cotyledon.Service):
|
||||
CONF.orchestrator.coordination_url,
|
||||
uuidutils.generate_uuid().encode('ascii'))
|
||||
self.coord.start(start_heart=True)
|
||||
self._check_state = functools.partial(
|
||||
_check_state, self, CONF.collect.period)
|
||||
|
||||
def _init_messaging(self):
|
||||
target = oslo_messaging.Target(topic='cloudkitty',
|
||||
@ -352,12 +359,6 @@ class Orchestrator(cotyledon.Service):
|
||||
self.server = messaging.get_server(target, endpoints)
|
||||
self.server.start()
|
||||
|
||||
def _check_state(self, tenant_id):
|
||||
timestamp = self._state.get_state(tenant_id)
|
||||
return ck_utils.check_time_state(timestamp,
|
||||
CONF.collect.period,
|
||||
CONF.collect.wait_periods)
|
||||
|
||||
def process_messages(self):
|
||||
# TODO(sheeprine): Code kept to handle threading and asynchronous
|
||||
# reloading
|
||||
|
@ -14,14 +14,12 @@
|
||||
# under the License.
|
||||
#
|
||||
import abc
|
||||
from datetime import timedelta
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from cloudkitty import utils as ck_utils
|
||||
# from cloudkitty.storage import NoTimeFrame
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -39,9 +37,7 @@ class BaseStorage(object):
|
||||
self._collector = kwargs.get('collector')
|
||||
|
||||
# State vars
|
||||
self.usage_start = {}
|
||||
self.usage_start_dt = {}
|
||||
self.usage_end = {}
|
||||
self.usage_end_dt = {}
|
||||
self._has_data = {}
|
||||
|
||||
@ -59,17 +55,17 @@ class BaseStorage(object):
|
||||
Removes the usage from the json data and returns it.
|
||||
:param json_data: Data to filter.
|
||||
"""
|
||||
candidate_ts = None
|
||||
candidate = None
|
||||
candidate_idx = 0
|
||||
|
||||
for idx, usage in enumerate(json_data):
|
||||
usage_ts = usage['period']['begin']
|
||||
if candidate_ts is None or usage_ts < candidate_ts:
|
||||
candidate_ts = usage_ts
|
||||
if candidate is None or usage_ts < candidate:
|
||||
candidate = usage_ts
|
||||
candidate_idx = idx
|
||||
|
||||
if candidate_ts:
|
||||
return candidate_ts, json_data.pop(candidate_idx)['usage']
|
||||
if candidate:
|
||||
return candidate, json_data.pop(candidate_idx)['usage']
|
||||
|
||||
def _pre_commit(self, tenant_id):
|
||||
"""Called before every commit.
|
||||
@ -107,8 +103,7 @@ class BaseStorage(object):
|
||||
:param begin: New usage beginning timestamp.
|
||||
:param tenant_id: tenant_id to update.
|
||||
"""
|
||||
self.usage_start[tenant_id] = begin
|
||||
self.usage_start_dt[tenant_id] = ck_utils.ts2dt(begin)
|
||||
self.usage_start_dt[tenant_id] = begin
|
||||
|
||||
def _update_end(self, end, tenant_id):
|
||||
"""Update usage_end with a new timestamp.
|
||||
@ -116,17 +111,14 @@ class BaseStorage(object):
|
||||
:param end: New usage end timestamp.
|
||||
:param tenant_id: tenant_id to update.
|
||||
"""
|
||||
self.usage_end[tenant_id] = end
|
||||
self.usage_end_dt[tenant_id] = ck_utils.ts2dt(end)
|
||||
self.usage_end_dt[tenant_id] = end
|
||||
|
||||
def _clear_usage_info(self, tenant_id):
|
||||
"""Clear usage information timestamps.
|
||||
|
||||
:param tenant_id: tenant_id which information needs to be removed.
|
||||
"""
|
||||
self.usage_start.pop(tenant_id, None)
|
||||
self.usage_start_dt.pop(tenant_id, None)
|
||||
self.usage_end.pop(tenant_id, None)
|
||||
self.usage_end_dt.pop(tenant_id, None)
|
||||
|
||||
def _check_commit(self, usage_start, tenant_id):
|
||||
@ -135,12 +127,13 @@ class BaseStorage(object):
|
||||
:param usage_start: Start of the period.
|
||||
:param tenant_id: tenant_id to check for.
|
||||
"""
|
||||
usage_end = self.usage_end.get(tenant_id)
|
||||
usage_end = self.usage_end_dt.get(tenant_id)
|
||||
if usage_end is not None and usage_start >= usage_end:
|
||||
self.commit(tenant_id)
|
||||
if self.usage_start.get(tenant_id) is None:
|
||||
if self.usage_start_dt.get(tenant_id) is None:
|
||||
self._update_start(usage_start, tenant_id)
|
||||
self._update_end(usage_start + self._period, tenant_id)
|
||||
self._update_end(
|
||||
usage_start + timedelta(seconds=self._period), tenant_id)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_state(self, tenant_id=None):
|
||||
|
@ -21,7 +21,6 @@ from cloudkitty import db
|
||||
from cloudkitty.storage.v1 import BaseStorage
|
||||
from cloudkitty.storage.v1.hybrid import migration
|
||||
from cloudkitty.storage.v1.hybrid import models
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
# NOTE(mc): The deprecated section should be removed in a future release.
|
||||
@ -79,7 +78,7 @@ class HybridStorage(BaseStorage):
|
||||
q = q.filter(self.state_model.tenant_id == tenant_id)
|
||||
q = q.order_by(self.state_model.state.desc())
|
||||
r = q.first()
|
||||
return ck_utils.dt2ts(r.state) if r else None
|
||||
return r.state if r else None
|
||||
|
||||
def _set_state(self, tenant_id, state):
|
||||
self._check_session(tenant_id)
|
||||
|
@ -80,7 +80,7 @@ class SQLAlchemyStorage(storage.BaseStorage):
|
||||
self.frame_model.begin.desc())
|
||||
r = q.first()
|
||||
if r:
|
||||
return ck_utils.dt2ts(r.begin)
|
||||
return r.begin
|
||||
|
||||
def get_total(self, begin=None, end=None, tenant_id=None,
|
||||
service=None, groupby=None):
|
||||
@ -145,16 +145,16 @@ class SQLAlchemyStorage(storage.BaseStorage):
|
||||
|
||||
def get_time_frame(self, begin, end, **filters):
|
||||
if not begin:
|
||||
begin = ck_utils.get_month_start_timestamp()
|
||||
begin = ck_utils.get_month_start()
|
||||
if not end:
|
||||
end = ck_utils.get_next_month_timestamp()
|
||||
end = ck_utils.get_next_month()
|
||||
session = db.get_session()
|
||||
q = utils.model_query(
|
||||
self.frame_model,
|
||||
session)
|
||||
q = q.filter(
|
||||
self.frame_model.begin >= ck_utils.ts2dt(begin),
|
||||
self.frame_model.end <= ck_utils.ts2dt(end))
|
||||
self.frame_model.begin >= begin,
|
||||
self.frame_model.end <= end)
|
||||
for filter_name, filter_value in filters.items():
|
||||
if filter_value:
|
||||
q = q.filter(
|
||||
|
@ -18,7 +18,6 @@ import sqlalchemy
|
||||
from sqlalchemy.ext import declarative
|
||||
|
||||
from cloudkitty import json_utils as json
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
Base = declarative.declarative_base()
|
||||
@ -74,8 +73,8 @@ class RatedDataFrame(Base, models.ModelBase):
|
||||
|
||||
# Time informations
|
||||
period_dict = {}
|
||||
period_dict['begin'] = ck_utils.dt2iso(self.begin)
|
||||
period_dict['end'] = ck_utils.dt2iso(self.end)
|
||||
period_dict['begin'] = self.begin
|
||||
period_dict['end'] = self.end
|
||||
|
||||
# Add period to the resource informations
|
||||
ck_dict = {}
|
||||
|
@ -133,7 +133,7 @@ class InfluxClient(object):
|
||||
'measurement': 'dataframes',
|
||||
'tags': measurement_tags,
|
||||
'fields': measurement_fields,
|
||||
'time': utils.ts2dt(timestamp),
|
||||
'time': timestamp,
|
||||
})
|
||||
if self._autocommit and len(self._points) >= self._chunk_size:
|
||||
self.commit()
|
||||
@ -266,15 +266,6 @@ class InfluxStorage(v2_storage.BaseStorage):
|
||||
begin = utils.get_month_start()
|
||||
if not end:
|
||||
end = utils.get_next_month()
|
||||
if isinstance(begin, six.text_type):
|
||||
begin = utils.iso2dt(begin)
|
||||
if isinstance(begin, int):
|
||||
begin = utils.ts2dt(begin)
|
||||
if isinstance(end, six.text_type):
|
||||
end = utils.iso2dt(end)
|
||||
if isinstance(end, int):
|
||||
end = utils.ts2dt(end)
|
||||
|
||||
return begin, end
|
||||
|
||||
@staticmethod
|
||||
|
@ -20,7 +20,6 @@ from oslo_log import log
|
||||
from cloudkitty import db
|
||||
from cloudkitty.storage_state import migration
|
||||
from cloudkitty.storage_state import models
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -115,8 +114,19 @@ class StateManager(object):
|
||||
|
||||
def set_state(self, identifier, state,
|
||||
fetcher=None, collector=None, scope_key=None):
|
||||
if isinstance(state, int):
|
||||
state = ck_utils.ts2dt(state)
|
||||
"""Set the state of a scope.
|
||||
|
||||
:param identifier: Identifier of the scope
|
||||
:type identifier: str
|
||||
:param state: state of the scope
|
||||
:type state: datetime.datetime
|
||||
:param fetcher: Fetcher associated to the scope
|
||||
:type fetcher: str
|
||||
:param collector: Collector associated to the scope
|
||||
:type collector: str
|
||||
:param scope_key: scope_key associated to the scope
|
||||
:type scope_key: str
|
||||
"""
|
||||
session = db.get_session()
|
||||
session.begin()
|
||||
r = self._get_db_item(
|
||||
@ -140,12 +150,24 @@ class StateManager(object):
|
||||
|
||||
def get_state(self, identifier,
|
||||
fetcher=None, collector=None, scope_key=None):
|
||||
"""Get the state of a scope.
|
||||
|
||||
:param identifier: Identifier of the scope
|
||||
:type identifier: str
|
||||
:param fetcher: Fetcher associated to the scope
|
||||
:type fetcher: str
|
||||
:param collector: Collector associated to the scope
|
||||
:type collector: str
|
||||
:param scope_key: scope_key associated to the scope
|
||||
:type scope_key: str
|
||||
:rtype: datetime.datetime
|
||||
"""
|
||||
session = db.get_session()
|
||||
session.begin()
|
||||
r = self._get_db_item(
|
||||
session, identifier, fetcher, collector, scope_key)
|
||||
session.close()
|
||||
return ck_utils.dt2ts(r.state) if r else None
|
||||
return r.state if r else None
|
||||
|
||||
def init(self):
|
||||
migration.upgrade('head')
|
||||
|
@ -19,7 +19,6 @@ import mock
|
||||
from cloudkitty.collector import monasca as mon_collector
|
||||
from cloudkitty import tests
|
||||
from cloudkitty import transformer
|
||||
from cloudkitty import utils
|
||||
|
||||
|
||||
class MonascaCollectorTest(tests.TestCase):
|
||||
@ -63,8 +62,8 @@ class MonascaCollectorTest(tests.TestCase):
|
||||
end = datetime.datetime(2019, 1, 1, 1)
|
||||
self.collector._fetch_measures(
|
||||
'metric_one',
|
||||
utils.dt2ts(start),
|
||||
utils.dt2ts(end),
|
||||
start,
|
||||
end,
|
||||
)
|
||||
m.assert_called_once_with(
|
||||
name='metric_one',
|
||||
@ -84,8 +83,8 @@ class MonascaCollectorTest(tests.TestCase):
|
||||
end = datetime.datetime(2019, 1, 1, 1)
|
||||
self.collector._fetch_measures(
|
||||
'metric_two',
|
||||
utils.dt2ts(start),
|
||||
utils.dt2ts(end),
|
||||
start,
|
||||
end,
|
||||
)
|
||||
m.assert_called_once_with(
|
||||
name='metric_two',
|
||||
|
@ -73,7 +73,7 @@ class PrometheusCollectorTest(tests.TestCase):
|
||||
)
|
||||
mock_get.assert_called_once_with(
|
||||
query,
|
||||
samples.FIRST_PERIOD_END,
|
||||
samples.FIRST_PERIOD_END.isoformat(),
|
||||
)
|
||||
|
||||
def test_format_data_instant_query(self):
|
||||
|
@ -47,7 +47,6 @@ from cloudkitty.tests.storage.v2 import influx_utils
|
||||
from cloudkitty.tests import utils as test_utils
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
INITIAL_TIMESTAMP = 1420070400
|
||||
|
||||
|
||||
@ -294,6 +293,10 @@ class QuoteFakeRPC(BaseFakeRPC):
|
||||
|
||||
class BaseStorageDataFixture(fixture.GabbiFixture):
|
||||
def create_fake_data(self, begin, end, project_id):
|
||||
if isinstance(begin, int):
|
||||
begin = ck_utils.ts2dt(begin)
|
||||
if isinstance(end, int):
|
||||
end = ck_utils.ts2dt(end)
|
||||
data = [{
|
||||
"period": {
|
||||
"begin": begin,
|
||||
@ -359,7 +362,8 @@ class StorageDataFixture(BaseStorageDataFixture):
|
||||
for i in range(data_ts,
|
||||
data_ts + data_duration,
|
||||
3600):
|
||||
data = self.create_fake_data(i, i + 3600, tenant_list[0])
|
||||
data = self.create_fake_data(
|
||||
i, i + 3600, tenant_list[0])
|
||||
self.storage.push(data, tenant_list[0])
|
||||
half_duration = int(data_duration / 2)
|
||||
for i in range(data_ts,
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
#
|
||||
import copy
|
||||
import datetime
|
||||
import decimal
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
@ -25,14 +26,14 @@ TENANT = 'f266f30b11f246b589fd266f85eeec39'
|
||||
OTHER_TENANT = '8d3ae500-89ea-4142-9c6e-1269db6a0b64'
|
||||
|
||||
INITIAL_TIMESTAMP = 1420070400
|
||||
FIRST_PERIOD_BEGIN = INITIAL_TIMESTAMP
|
||||
FIRST_PERIOD_BEGIN_ISO = ck_utils.ts2iso(FIRST_PERIOD_BEGIN)
|
||||
FIRST_PERIOD_END = FIRST_PERIOD_BEGIN + 3600
|
||||
FIRST_PERIOD_END_ISO = ck_utils.ts2iso(FIRST_PERIOD_END)
|
||||
FIRST_PERIOD_BEGIN = ck_utils.ts2dt(INITIAL_TIMESTAMP)
|
||||
FIRST_PERIOD_BEGIN_ISO = ck_utils.dt2iso(FIRST_PERIOD_BEGIN)
|
||||
FIRST_PERIOD_END = FIRST_PERIOD_BEGIN + datetime.timedelta(seconds=3600)
|
||||
FIRST_PERIOD_END_ISO = ck_utils.dt2iso(FIRST_PERIOD_END)
|
||||
SECOND_PERIOD_BEGIN = FIRST_PERIOD_END
|
||||
SECOND_PERIOD_BEGIN_ISO = ck_utils.ts2iso(SECOND_PERIOD_BEGIN)
|
||||
SECOND_PERIOD_END = SECOND_PERIOD_BEGIN + 3600
|
||||
SECOND_PERIOD_END_ISO = ck_utils.ts2iso(SECOND_PERIOD_END)
|
||||
SECOND_PERIOD_BEGIN_ISO = ck_utils.dt2iso(SECOND_PERIOD_BEGIN)
|
||||
SECOND_PERIOD_END = SECOND_PERIOD_BEGIN + datetime.timedelta(seconds=3600)
|
||||
SECOND_PERIOD_END_ISO = ck_utils.dt2iso(SECOND_PERIOD_END)
|
||||
|
||||
COMPUTE_METADATA = {
|
||||
'availability_zone': 'nova',
|
||||
@ -223,8 +224,8 @@ DEFAULT_METRICS_CONF = {
|
||||
def split_storage_data(raw_data):
|
||||
final_data = []
|
||||
for frame in raw_data:
|
||||
frame['period']['begin'] = ck_utils.ts2iso(frame['period']['begin'])
|
||||
frame['period']['end'] = ck_utils.ts2iso(frame['period']['end'])
|
||||
frame['period']['begin'] = ck_utils.dt2iso(frame['period']['begin'])
|
||||
frame['period']['end'] = ck_utils.dt2iso(frame['period']['end'])
|
||||
usage_buffer = frame.pop('usage')
|
||||
# Sort to have a consistent result as we are converting it to a list
|
||||
for service, data in sorted(usage_buffer.items()):
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
#
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
import mock
|
||||
import testscenarios
|
||||
@ -22,7 +23,6 @@ from cloudkitty import storage
|
||||
from cloudkitty import tests
|
||||
from cloudkitty.tests import samples
|
||||
from cloudkitty.tests import utils as test_utils
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
class StorageTest(tests.TestCase):
|
||||
@ -73,7 +73,8 @@ class StorageDataframeTest(StorageTest):
|
||||
self.assertRaises(
|
||||
storage.NoTimeFrame,
|
||||
self.storage.retrieve,
|
||||
begin=samples.FIRST_PERIOD_BEGIN - 3600,
|
||||
begin=(samples.FIRST_PERIOD_BEGIN
|
||||
- datetime.timedelta(seconds=3600)),
|
||||
end=samples.FIRST_PERIOD_BEGIN)
|
||||
|
||||
def test_get_frame_filter_outside_data(self):
|
||||
@ -81,7 +82,8 @@ class StorageDataframeTest(StorageTest):
|
||||
self.assertRaises(
|
||||
storage.NoTimeFrame,
|
||||
self.storage.retrieve,
|
||||
begin=samples.FIRST_PERIOD_BEGIN - 3600,
|
||||
begin=(samples.FIRST_PERIOD_BEGIN
|
||||
- datetime.timedelta(seconds=3600)),
|
||||
end=samples.FIRST_PERIOD_BEGIN)
|
||||
|
||||
def test_get_frame_without_filter_but_timestamp(self):
|
||||
@ -132,8 +134,8 @@ class StorageTotalTest(StorageTest):
|
||||
|
||||
# Total
|
||||
def test_get_empty_total(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN - 3600)
|
||||
end = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
begin = samples.FIRST_PERIOD_BEGIN - datetime.timedelta(seconds=3600)
|
||||
end = samples.FIRST_PERIOD_BEGIN
|
||||
self.insert_data()
|
||||
total = self.storage.total(
|
||||
begin=begin,
|
||||
@ -144,8 +146,8 @@ class StorageTotalTest(StorageTest):
|
||||
self.assertEqual(end, total[0]["end"])
|
||||
|
||||
def test_get_total_without_filter_but_timestamp(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
|
||||
begin = samples.FIRST_PERIOD_BEGIN
|
||||
end = samples.SECOND_PERIOD_END
|
||||
self.insert_data()
|
||||
total = self.storage.total(
|
||||
begin=begin,
|
||||
@ -157,8 +159,8 @@ class StorageTotalTest(StorageTest):
|
||||
self.assertEqual(end, total[0]["end"])
|
||||
|
||||
def test_get_total_filtering_on_one_period(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
|
||||
begin = samples.FIRST_PERIOD_BEGIN
|
||||
end = samples.FIRST_PERIOD_END
|
||||
self.insert_data()
|
||||
total = self.storage.total(
|
||||
begin=begin,
|
||||
@ -169,8 +171,8 @@ class StorageTotalTest(StorageTest):
|
||||
self.assertEqual(end, total[0]["end"])
|
||||
|
||||
def test_get_total_filtering_on_one_period_and_one_tenant(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
|
||||
begin = samples.FIRST_PERIOD_BEGIN
|
||||
end = samples.FIRST_PERIOD_END
|
||||
self.insert_data()
|
||||
filters = {'project_id': self._tenant_id}
|
||||
total = self.storage.total(
|
||||
@ -184,8 +186,8 @@ class StorageTotalTest(StorageTest):
|
||||
self.assertEqual(end, total[0]["end"])
|
||||
|
||||
def test_get_total_filtering_on_service(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
end = ck_utils.ts2dt(samples.FIRST_PERIOD_END)
|
||||
begin = samples.FIRST_PERIOD_BEGIN
|
||||
end = samples.FIRST_PERIOD_END
|
||||
self.insert_data()
|
||||
total = self.storage.total(
|
||||
begin=begin,
|
||||
@ -198,8 +200,8 @@ class StorageTotalTest(StorageTest):
|
||||
self.assertEqual(end, total[0]["end"])
|
||||
|
||||
def test_get_total_groupby_tenant(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
|
||||
begin = samples.FIRST_PERIOD_BEGIN
|
||||
end = samples.SECOND_PERIOD_END
|
||||
self.insert_data()
|
||||
total = self.storage.total(
|
||||
begin=begin,
|
||||
@ -216,8 +218,8 @@ class StorageTotalTest(StorageTest):
|
||||
self.assertEqual(end, total[1]["end"])
|
||||
|
||||
def test_get_total_groupby_restype(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
|
||||
begin = samples.FIRST_PERIOD_BEGIN
|
||||
end = samples.SECOND_PERIOD_END
|
||||
self.insert_data()
|
||||
total = self.storage.total(
|
||||
begin=begin,
|
||||
@ -234,8 +236,8 @@ class StorageTotalTest(StorageTest):
|
||||
self.assertEqual(end, total[1]["end"])
|
||||
|
||||
def test_get_total_groupby_tenant_and_restype(self):
|
||||
begin = ck_utils.ts2dt(samples.FIRST_PERIOD_BEGIN)
|
||||
end = ck_utils.ts2dt(samples.SECOND_PERIOD_END)
|
||||
begin = samples.FIRST_PERIOD_BEGIN
|
||||
end = samples.SECOND_PERIOD_END
|
||||
self.insert_data()
|
||||
total = self.storage.total(
|
||||
begin=begin,
|
||||
|
@ -137,13 +137,10 @@ class FakeInfluxClient(InfluxClient):
|
||||
|
||||
def delete(self, begin, end, filters):
|
||||
|
||||
beg = utils.dt2ts(begin) if begin else None
|
||||
end = utils.dt2ts(end) if end else None
|
||||
|
||||
def __filter_func(elem):
|
||||
|
||||
def __time(elem):
|
||||
return ((beg and beg > elem['time'])
|
||||
return ((begin and begin > elem['time'])
|
||||
or (end and end <= elem['time']))
|
||||
|
||||
def __filt(elem):
|
||||
|
@ -28,10 +28,10 @@ def generate_v2_storage_data(min_length=10,
|
||||
project_ids=None,
|
||||
start=datetime(2018, 1, 1),
|
||||
end=datetime(2018, 1, 1, 1)):
|
||||
if isinstance(start, datetime):
|
||||
start = ck_utils.dt2ts(start)
|
||||
if isinstance(end, datetime):
|
||||
end = ck_utils.dt2ts(end)
|
||||
if isinstance(start, int):
|
||||
start = ck_utils.ts2dt(start)
|
||||
if isinstance(end, int):
|
||||
end = ck_utils.ts2dt(end)
|
||||
|
||||
if not project_ids:
|
||||
project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)]
|
||||
|
@ -213,15 +213,29 @@ def refresh_stevedore(namespace=None):
|
||||
|
||||
|
||||
def check_time_state(timestamp=None, period=0, wait_periods=0):
|
||||
if not timestamp:
|
||||
return get_month_start_timestamp()
|
||||
"""Checks the state of a timestamp compared to the current time.
|
||||
|
||||
now = utcnow_ts()
|
||||
next_timestamp = timestamp + period
|
||||
wait_time = wait_periods * period
|
||||
if next_timestamp + wait_time < now:
|
||||
Returns the next timestamp based on the current timestamp and the period if
|
||||
the next timestamp is inferior to the current time and the waiting period
|
||||
or None if not.
|
||||
|
||||
:param timestamp: Current timestamp
|
||||
:type timestamp: datetime.datetime
|
||||
:param period: Period, in seconds
|
||||
:type period: int
|
||||
:param wait_periods: periods to wait before the current timestamp.
|
||||
:type wait_periods: int
|
||||
:rtype: datetime.datetime
|
||||
"""
|
||||
if not timestamp:
|
||||
return get_month_start()
|
||||
|
||||
period_delta = datetime.timedelta(seconds=period)
|
||||
next_timestamp = timestamp + period_delta
|
||||
wait_time = wait_periods * period_delta
|
||||
if next_timestamp + wait_time < utcnow():
|
||||
return next_timestamp
|
||||
return 0
|
||||
return None
|
||||
|
||||
|
||||
def load_conf(conf_path):
|
||||
|
Loading…
x
Reference in New Issue
Block a user