16b6fa5ce8
As specified by the OpenStack Style Guidelines in its 1.1.1.dev15 version, which is the latest version to this day, *author* tags should not be specified (see H105). https://docs.openstack.org/hacking/latest/user/hacking.html#general Change-Id: I0ab7359e15eb04948c7a905378001a66cb901e69
159 lines
5.4 KiB
Python
159 lines
5.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2014 Objectif Libre
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
import copy
|
|
|
|
from oslo_config import cfg
|
|
from oslo_utils import fileutils
|
|
from stevedore import named
|
|
|
|
from cloudkitty import state
|
|
from cloudkitty import storage
|
|
from cloudkitty import storage_state
|
|
from cloudkitty import utils as ck_utils
|
|
|
|
CONF = cfg.CONF
|
|
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
|
|
|
|
|
|
class WriteOrchestrator(object):
|
|
"""Write Orchestrator:
|
|
|
|
Handle incoming data from the global orchestrator, and store them in an
|
|
intermediary data format before final transformation.
|
|
"""
|
|
def __init__(self,
|
|
backend,
|
|
tenant_id,
|
|
storage,
|
|
basepath=None,
|
|
period=3600):
|
|
self._backend = backend
|
|
self._tenant_id = tenant_id
|
|
self._storage = storage
|
|
self._storage_state = storage_state.StateManager()
|
|
self._basepath = basepath
|
|
if self._basepath:
|
|
fileutils.ensure_tree(self._basepath)
|
|
self._period = period
|
|
self._sm = state.DBStateManager(self._tenant_id,
|
|
'writer_status')
|
|
self._write_pipeline = []
|
|
|
|
# State vars
|
|
self.usage_start = None
|
|
self.usage_end = None
|
|
|
|
# Current total
|
|
self.total = 0
|
|
|
|
def init_writing_pipeline(self):
|
|
CONF.import_opt('pipeline', 'cloudkitty.config', 'output')
|
|
output_pipeline = named.NamedExtensionManager(
|
|
WRITERS_NAMESPACE,
|
|
CONF.output.pipeline)
|
|
for writer in output_pipeline:
|
|
self.add_writer(writer.plugin)
|
|
|
|
def add_writer(self, writer_class):
|
|
writer = writer_class(self,
|
|
self._tenant_id,
|
|
self._backend,
|
|
self._basepath)
|
|
self._write_pipeline.append(writer)
|
|
|
|
def _update_state_manager_data(self):
|
|
self._sm.set_state(self.usage_end)
|
|
metadata = {'total': self.total}
|
|
self._sm.set_metadata(metadata)
|
|
|
|
def _load_state_manager_data(self):
|
|
timeframe = self._sm.get_state()
|
|
if timeframe:
|
|
self.usage_start = timeframe
|
|
self.usage_end = self.usage_start + self._period
|
|
metadata = self._sm.get_metadata()
|
|
if metadata:
|
|
self.total = metadata.get('total', 0)
|
|
|
|
def _dispatch(self, data):
|
|
for service in data:
|
|
# Update totals
|
|
for entry in data[service]:
|
|
self.total += entry['rating']['price']
|
|
# Dispatch data to writing pipeline
|
|
for backend in self._write_pipeline:
|
|
backend.append(data, self.usage_start, self.usage_end)
|
|
|
|
def get_timeframe(self, timeframe, timeframe_end=None):
|
|
if not timeframe_end:
|
|
timeframe_end = timeframe + self._period
|
|
try:
|
|
filters = {'project_id': self._tenant_id}
|
|
data = self._storage.retrieve(begin=timeframe,
|
|
end=timeframe_end,
|
|
filters=filters,
|
|
paginate=False)
|
|
for df in data['dataframes']:
|
|
for service, resources in df['usage'].items():
|
|
for resource in resources:
|
|
resource['desc'] = copy.deepcopy(resource['metadata'])
|
|
resource['desc'].update(resource['groupby'])
|
|
except storage.NoTimeFrame:
|
|
return None
|
|
return data
|
|
|
|
def close(self):
|
|
for writer in self._write_pipeline:
|
|
writer.close()
|
|
|
|
def _push_data(self):
|
|
data = self.get_timeframe(self.usage_start, self.usage_end)
|
|
if data and data['total'] > 0:
|
|
for timeframe in data['dataframes']:
|
|
self._dispatch(timeframe['usage'])
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def _commit_data(self):
|
|
for backend in self._write_pipeline:
|
|
backend.commit()
|
|
|
|
def reset_state(self):
|
|
self._load_state_manager_data()
|
|
self.usage_end = self._storage_state.get_state()
|
|
self._update_state_manager_data()
|
|
|
|
def restart_month(self):
|
|
self._load_state_manager_data()
|
|
month_start = ck_utils.get_month_start()
|
|
self.usage_end = ck_utils.dt2ts(month_start)
|
|
self._update_state_manager_data()
|
|
|
|
def process(self):
|
|
self._load_state_manager_data()
|
|
storage_state = self._storage_state.get_state(self._tenant_id)
|
|
if not self.usage_start:
|
|
self.usage_start = storage_state
|
|
self.usage_end = self.usage_start + self._period
|
|
while storage_state > self.usage_start:
|
|
if self._push_data():
|
|
self._commit_data()
|
|
self._update_state_manager_data()
|
|
self._load_state_manager_data()
|
|
storage_state = self._storage_state.get_state(self._tenant_id)
|
|
self.close()
|