Merge "Remove cloudkitty-writer"
This commit is contained in:
@@ -1,21 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import io
|
||||
|
||||
|
||||
class FileBackend(io.FileIO):
|
||||
def __init__(self, path, mode='a+'):
|
||||
super(FileBackend, self).__init__(path, mode)
|
||||
@@ -1,112 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import importutils as i_utils
|
||||
|
||||
from cloudkitty import config # noqa
|
||||
from cloudkitty import service
|
||||
from cloudkitty import storage
|
||||
from cloudkitty import utils as ck_utils
|
||||
from cloudkitty import write_orchestrator
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('period', 'cloudkitty.collector', 'collect')
|
||||
CONF.import_opt('backend', 'cloudkitty.config', 'output')
|
||||
CONF.import_opt('basepath', 'cloudkitty.config', 'output')
|
||||
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
|
||||
|
||||
|
||||
class DBCommand(object):
|
||||
|
||||
def __init__(self):
|
||||
self._storage = None
|
||||
self._output = None
|
||||
self._load_storage_backend()
|
||||
self._load_output_backend()
|
||||
|
||||
def _load_storage_backend(self):
|
||||
self._storage = storage.get_storage()
|
||||
|
||||
def _load_output_backend(self):
|
||||
backend = i_utils.import_class(CONF.output.backend)
|
||||
self._output = backend
|
||||
|
||||
def generate(self):
|
||||
if not CONF.command.tenant:
|
||||
if not CONF.command.begin:
|
||||
CONF.command.begin = ck_utils.get_month_start()
|
||||
if not CONF.command.end:
|
||||
CONF.command.end = ck_utils.get_next_month()
|
||||
tenants = self._storage.get_tenants(CONF.command.begin,
|
||||
CONF.command.end)
|
||||
else:
|
||||
tenants = [CONF.command.tenant]
|
||||
for tenant in tenants:
|
||||
wo = write_orchestrator.WriteOrchestrator(self._output,
|
||||
tenant,
|
||||
self._storage,
|
||||
CONF.output.basepath)
|
||||
wo.init_writing_pipeline()
|
||||
if not CONF.command.begin:
|
||||
wo.restart_month()
|
||||
wo.process()
|
||||
|
||||
def tenants_list(self):
|
||||
if not CONF.command.begin:
|
||||
CONF.command.begin = ck_utils.get_month_start()
|
||||
if not CONF.command.end:
|
||||
CONF.command.end = ck_utils.get_next_month()
|
||||
tenants = self._storage.get_tenants(CONF.command.begin,
|
||||
CONF.command.end)
|
||||
print('Tenant list:')
|
||||
for tenant in tenants:
|
||||
print(tenant)
|
||||
|
||||
|
||||
def call_generate(command_object):
|
||||
command_object.generate()
|
||||
|
||||
|
||||
def call_tenants_list(command_object):
|
||||
command_object.tenants_list()
|
||||
|
||||
|
||||
def add_command_parsers(subparsers):
|
||||
parser = subparsers.add_parser('generate')
|
||||
parser.set_defaults(func=call_generate)
|
||||
parser.add_argument('--tenant', nargs='?')
|
||||
parser.add_argument('--begin', nargs='?')
|
||||
parser.add_argument('--end', nargs='?')
|
||||
|
||||
parser = subparsers.add_parser('tenants_list')
|
||||
parser.set_defaults(func=call_tenants_list)
|
||||
parser.add_argument('--begin', nargs='?')
|
||||
parser.add_argument('--end', nargs='?')
|
||||
|
||||
|
||||
command_opt = cfg.SubCommandOpt('command',
|
||||
title='Command',
|
||||
help='Available commands',
|
||||
handler=add_command_parsers)
|
||||
|
||||
CONF.register_cli_opt(command_opt)
|
||||
|
||||
|
||||
def main():
|
||||
service.prepare_service()
|
||||
command_object = DBCommand()
|
||||
CONF.command.func(command_object)
|
||||
@@ -59,8 +59,6 @@ _opts = [
|
||||
cloudkitty.fetcher.source.fetcher_source_opts))),
|
||||
('orchestrator', list(itertools.chain(
|
||||
cloudkitty.orchestrator.orchestrator_opts))),
|
||||
('output', list(itertools.chain(
|
||||
cloudkitty.config.output_opts))),
|
||||
('storage', list(itertools.chain(
|
||||
cloudkitty.storage.storage_opts))),
|
||||
('storage_influxdb', list(itertools.chain(
|
||||
|
||||
@@ -14,24 +14,9 @@
|
||||
# under the License.
|
||||
#
|
||||
from oslo_config import cfg
|
||||
from oslo_db import options as db_options # noqa
|
||||
from oslo_messaging import opts # noqa
|
||||
from oslo_db import options as db_options
|
||||
|
||||
|
||||
output_opts = [
|
||||
cfg.StrOpt('backend',
|
||||
default='cloudkitty.backend.file.FileBackend',
|
||||
help='Backend for the output manager.'),
|
||||
cfg.StrOpt('basepath',
|
||||
default='/var/lib/cloudkitty/states/',
|
||||
help='Storage directory for the file output backend.'),
|
||||
cfg.ListOpt('pipeline',
|
||||
default=['osrf'],
|
||||
help='Output pipeline'), ]
|
||||
|
||||
|
||||
cfg.CONF.register_opts(output_opts, 'output')
|
||||
|
||||
# oslo.db defaults
|
||||
db_options.set_defaults(
|
||||
cfg.CONF,
|
||||
|
||||
@@ -1,160 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import copy
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import fileutils
|
||||
from stevedore import named
|
||||
|
||||
from cloudkitty import state
|
||||
from cloudkitty import storage
|
||||
from cloudkitty import storage_state
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
|
||||
|
||||
|
||||
class WriteOrchestrator(object):
|
||||
"""Write Orchestrator:
|
||||
|
||||
Handle incoming data from the global orchestrator, and store them in an
|
||||
intermediary data format before final transformation.
|
||||
"""
|
||||
def __init__(self,
|
||||
backend,
|
||||
tenant_id,
|
||||
storage,
|
||||
basepath=None,
|
||||
period=3600):
|
||||
self._backend = backend
|
||||
self._tenant_id = tenant_id
|
||||
self._storage = storage
|
||||
self._storage_state = storage_state.StateManager()
|
||||
self._basepath = basepath
|
||||
if self._basepath:
|
||||
fileutils.ensure_tree(self._basepath)
|
||||
self._period = period
|
||||
self._sm = state.DBStateManager(self._tenant_id,
|
||||
'writer_status')
|
||||
self._write_pipeline = []
|
||||
|
||||
# State vars
|
||||
self.usage_start = None
|
||||
self.usage_end = None
|
||||
|
||||
# Current total
|
||||
self.total = 0
|
||||
|
||||
def init_writing_pipeline(self):
|
||||
CONF.import_opt('pipeline', 'cloudkitty.config', 'output')
|
||||
output_pipeline = named.NamedExtensionManager(
|
||||
WRITERS_NAMESPACE,
|
||||
CONF.output.pipeline)
|
||||
for writer in output_pipeline:
|
||||
self.add_writer(writer.plugin)
|
||||
|
||||
def add_writer(self, writer_class):
|
||||
writer = writer_class(self,
|
||||
self._tenant_id,
|
||||
self._backend,
|
||||
self._basepath)
|
||||
self._write_pipeline.append(writer)
|
||||
|
||||
def _update_state_manager_data(self):
|
||||
self._sm.set_state(self.usage_end)
|
||||
metadata = {'total': self.total}
|
||||
self._sm.set_metadata(metadata)
|
||||
|
||||
def _load_state_manager_data(self):
|
||||
timeframe = self._sm.get_state()
|
||||
if timeframe:
|
||||
self.usage_start = timeframe
|
||||
self.usage_end = self.usage_start + self._period
|
||||
metadata = self._sm.get_metadata()
|
||||
if metadata:
|
||||
self.total = metadata.get('total', 0)
|
||||
|
||||
def _dispatch(self, data):
|
||||
for service in data:
|
||||
# Update totals
|
||||
for entry in data[service]:
|
||||
self.total += entry['rating']['price']
|
||||
# Dispatch data to writing pipeline
|
||||
for backend in self._write_pipeline:
|
||||
backend.append(data, self.usage_start, self.usage_end)
|
||||
|
||||
def get_timeframe(self, timeframe, timeframe_end=None):
|
||||
if not timeframe_end:
|
||||
timeframe_end = timeframe + self._period
|
||||
try:
|
||||
filters = {'project_id': self._tenant_id}
|
||||
data = self._storage.retrieve(begin=timeframe,
|
||||
end=timeframe_end,
|
||||
filters=filters,
|
||||
paginate=False)
|
||||
for df in data['dataframes']:
|
||||
for service, resources in df['usage'].items():
|
||||
for resource in resources:
|
||||
resource['desc'] = copy.deepcopy(resource['metadata'])
|
||||
resource['desc'].update(resource['groupby'])
|
||||
except storage.NoTimeFrame:
|
||||
return None
|
||||
return data
|
||||
|
||||
def close(self):
|
||||
for writer in self._write_pipeline:
|
||||
writer.close()
|
||||
|
||||
def _push_data(self):
|
||||
data = self.get_timeframe(self.usage_start, self.usage_end)
|
||||
if data and data['total'] > 0:
|
||||
for timeframe in data['dataframes']:
|
||||
self._dispatch(timeframe['usage'])
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _commit_data(self):
|
||||
for backend in self._write_pipeline:
|
||||
backend.commit()
|
||||
|
||||
def reset_state(self):
|
||||
self._load_state_manager_data()
|
||||
self.usage_end = self._storage_state.get_last_processed_timestamp()
|
||||
self._update_state_manager_data()
|
||||
|
||||
def restart_month(self):
|
||||
self._load_state_manager_data()
|
||||
month_start = ck_utils.get_month_start()
|
||||
self.usage_end = ck_utils.dt2ts(month_start)
|
||||
self._update_state_manager_data()
|
||||
|
||||
def process(self):
|
||||
self._load_state_manager_data()
|
||||
storage_state = self._storage_state.get_last_processed_timestamp(
|
||||
self._tenant_id)
|
||||
if not self.usage_start:
|
||||
self.usage_start = storage_state
|
||||
self.usage_end = self.usage_start + self._period
|
||||
while storage_state > self.usage_start:
|
||||
if self._push_data():
|
||||
self._commit_data()
|
||||
self._update_state_manager_data()
|
||||
self._load_state_manager_data()
|
||||
storage_state = self._storage_state.get_last_processed_timestamp(
|
||||
self._tenant_id)
|
||||
self.close()
|
||||
@@ -1,162 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import abc
|
||||
|
||||
from cloudkitty import state
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
class BaseReportWriter(object, metaclass=abc.ABCMeta):
|
||||
"""Base report writer."""
|
||||
report_type = None
|
||||
|
||||
def __init__(self, write_orchestrator, tenant_id, backend, basepath=None):
|
||||
self._write_orchestrator = write_orchestrator
|
||||
self._backend = backend
|
||||
self._tenant_id = tenant_id
|
||||
self._sm = state.DBStateManager(self._tenant_id,
|
||||
self.report_type)
|
||||
self._report = None
|
||||
self._period = 3600
|
||||
|
||||
self._basepath = basepath
|
||||
|
||||
# State vars
|
||||
self.checked_first_line = False
|
||||
self.usage_start = None
|
||||
self.usage_start_dt = None
|
||||
self.usage_end = None
|
||||
self.usage_end_dt = None
|
||||
|
||||
# Current total
|
||||
self.total = 0
|
||||
|
||||
# Current usage period lines
|
||||
self._usage_data = {}
|
||||
|
||||
@abc.abstractmethod
|
||||
def _gen_filename(self):
|
||||
"""Filename generation
|
||||
|
||||
"""
|
||||
|
||||
def _open(self):
|
||||
filename = self._gen_filename()
|
||||
self._report = self._backend(filename, 'wb+')
|
||||
self._report.seek(0, 2)
|
||||
|
||||
def _get_report_size(self):
|
||||
return self._report.tell()
|
||||
|
||||
@abc.abstractmethod
|
||||
def _recover_state(self):
|
||||
"""Recover state from a last run.
|
||||
|
||||
"""
|
||||
|
||||
def _update_state_manager(self):
|
||||
self._sm.set_state(self.usage_end)
|
||||
metadata = {'total': self.total}
|
||||
self._sm.set_metadata(metadata)
|
||||
|
||||
def _get_state_manager_timeframe(self):
|
||||
timeframe = self._sm.get_state()
|
||||
self.usage_start = timeframe
|
||||
self.usage_start_dt = ck_utils.ts2dt(timeframe)
|
||||
self.usage_end = timeframe + self._period
|
||||
self.usage_end_dt = ck_utils.ts2dt(self.usage_end)
|
||||
metadata = self._sm.get_metadata()
|
||||
self.total = metadata.get('total', 0)
|
||||
|
||||
def get_timeframe(self, timeframe):
|
||||
return self._write_orchestrator.get_timeframe(timeframe)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _write_header(self):
|
||||
"""Write report headers
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _write_total(self):
|
||||
"""Write current total
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _write(self):
|
||||
"""Write report content
|
||||
|
||||
"""
|
||||
|
||||
def _pre_commit(self):
|
||||
if self._report is None:
|
||||
self._open()
|
||||
if not self.checked_first_line:
|
||||
if self._get_report_size() == 0:
|
||||
self._write_header()
|
||||
else:
|
||||
self._recover_state()
|
||||
self.checked_first_line = True
|
||||
else:
|
||||
self._recover_state()
|
||||
|
||||
def _commit(self):
|
||||
self._pre_commit()
|
||||
|
||||
self._write()
|
||||
self._update_state_manager()
|
||||
|
||||
self._post_commit()
|
||||
|
||||
def _post_commit(self):
|
||||
self._usage_data = {}
|
||||
self._write_total()
|
||||
|
||||
def _update(self, data):
|
||||
for service in data:
|
||||
if service in self._usage_data:
|
||||
self._usage_data[service].extend(data[service])
|
||||
else:
|
||||
self._usage_data[service] = data[service]
|
||||
# Update totals
|
||||
for entry in data[service]:
|
||||
self.total += entry['rating']['price']
|
||||
|
||||
def append(self, data, start, end):
|
||||
# FIXME we should use the real time values
|
||||
if self.usage_end is not None and start >= self.usage_end:
|
||||
self.usage_start = None
|
||||
|
||||
if self.usage_start is None:
|
||||
self.usage_start = start
|
||||
self.usage_end = start + self._period
|
||||
self.usage_start_dt = ck_utils.ts2dt(self.usage_start)
|
||||
self.usage_end_dt = ck_utils.ts2dt(self.usage_end)
|
||||
|
||||
self._update(data)
|
||||
|
||||
def commit(self):
|
||||
self._commit()
|
||||
|
||||
@abc.abstractmethod
|
||||
def _close_file(self):
|
||||
"""Close report file
|
||||
|
||||
"""
|
||||
|
||||
def close(self):
|
||||
self._close_file()
|
||||
@@ -1,251 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import collections
|
||||
import csv
|
||||
import datetime
|
||||
import os
|
||||
|
||||
from cloudkitty import utils as ck_utils
|
||||
from cloudkitty import writer
|
||||
|
||||
|
||||
class InconsistentHeaders(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BaseCSVBackend(writer.BaseReportWriter):
|
||||
"""Report format writer:
|
||||
|
||||
Generates report in csv format
|
||||
"""
|
||||
report_type = 'csv'
|
||||
|
||||
def __init__(self, write_orchestrator, user_id, backend, basepath):
|
||||
super(BaseCSVBackend, self).__init__(write_orchestrator,
|
||||
user_id,
|
||||
backend,
|
||||
basepath)
|
||||
|
||||
# Detailed transform OrderedDict
|
||||
self._field_map = collections.OrderedDict()
|
||||
|
||||
self._headers = []
|
||||
self._headers_len = 0
|
||||
self._extra_headers = []
|
||||
self._extra_headers_len = 0
|
||||
|
||||
# File vars
|
||||
self._csv_report = None
|
||||
|
||||
# State variables
|
||||
self.cached_start = None
|
||||
self.cached_start_str = ''
|
||||
self.cached_end = None
|
||||
self.cached_end_str = ''
|
||||
self._crumpled = False
|
||||
|
||||
# Current usage period lines
|
||||
self._usage_data = []
|
||||
|
||||
def _gen_filename(self, timeframe):
|
||||
filename = ('{}-{}-{:02d}.csv').format(self._tenant_id,
|
||||
timeframe.year,
|
||||
timeframe.month)
|
||||
if self._basepath:
|
||||
filename = os.path.join(self._basepath, filename)
|
||||
return filename
|
||||
|
||||
def _open(self):
|
||||
filename = self._gen_filename(self.usage_start_dt)
|
||||
self._report = self._backend(filename, 'rb+')
|
||||
self._csv_report = csv.writer(self._report)
|
||||
self._report.seek(0, 2)
|
||||
|
||||
def _close_file(self):
|
||||
if self._report is not None:
|
||||
self._report.close()
|
||||
|
||||
def _get_state_manager_timeframe(self):
|
||||
if self.report_type is None:
|
||||
raise NotImplementedError()
|
||||
|
||||
def _update_state_manager(self):
|
||||
if self.report_type is None:
|
||||
raise NotImplementedError()
|
||||
|
||||
super(BaseCSVBackend, self)._update_state_manager()
|
||||
|
||||
metadata = {'total': self.total}
|
||||
metadata['headers'] = self._extra_headers
|
||||
self._sm.set_metadata(metadata)
|
||||
|
||||
def _init_headers(self):
|
||||
headers = self._field_map.keys()
|
||||
for header in headers:
|
||||
if ':*' in header:
|
||||
continue
|
||||
self._headers.append(header)
|
||||
self._headers_len = len(self._headers)
|
||||
|
||||
def _write_header(self):
|
||||
self._csv_report.writerow(self._headers + self._extra_headers)
|
||||
|
||||
def _write(self):
|
||||
self._csv_report.writerows(self._usage_data)
|
||||
|
||||
def _post_commit(self):
|
||||
self._crumpled = False
|
||||
self._usage_data = []
|
||||
self._write_total()
|
||||
|
||||
def _update(self, data):
|
||||
"""Dispatch report data with context awareness.
|
||||
|
||||
"""
|
||||
if self._crumpled:
|
||||
return
|
||||
try:
|
||||
for service in data:
|
||||
for report_data in data[service]:
|
||||
self._process_data(service, report_data)
|
||||
self.total += report_data['rating']['price']
|
||||
except InconsistentHeaders:
|
||||
self._crumple()
|
||||
self._crumpled = True
|
||||
|
||||
def _recover_state(self):
|
||||
# Rewind 3 lines
|
||||
self._report.seek(0, 2)
|
||||
buf_size = self._report.tell()
|
||||
if buf_size > 2000:
|
||||
buf_size = 2000
|
||||
elif buf_size == 0:
|
||||
return
|
||||
self._report.seek(-buf_size, 2)
|
||||
end_buf = self._report.read()
|
||||
last_line = buf_size
|
||||
for dummy in range(4):
|
||||
last_line = end_buf.rfind('\n', 0, last_line)
|
||||
if last_line > 0:
|
||||
last_line -= len(end_buf) - 1
|
||||
else:
|
||||
raise RuntimeError('Unable to recover file state.')
|
||||
self._report.seek(last_line, 2)
|
||||
self._report.truncate()
|
||||
|
||||
def _crumple(self):
|
||||
# Reset states
|
||||
self._usage_data = []
|
||||
self.total = 0
|
||||
|
||||
# Recover state from file
|
||||
if self._report is not None:
|
||||
self._report.seek(0)
|
||||
reader = csv.reader(self._report)
|
||||
# Skip header
|
||||
for dummy in range(2):
|
||||
line = reader.next()
|
||||
self.usage_start_dt = datetime.datetime.strptime(
|
||||
line[0],
|
||||
'%Y/%m/%d %H:%M:%S')
|
||||
self.usage_start = ck_utils.dt2ts(self.usage_start_dt)
|
||||
self.usage_end_dt = datetime.datetime.strptime(
|
||||
line[1],
|
||||
'%Y/%m/%d %H:%M:%S')
|
||||
self.usage_end = ck_utils.dt2ts(self.usage_end_dt)
|
||||
|
||||
# Reset file
|
||||
self._report.seek(0)
|
||||
self._report.truncate()
|
||||
self._write_header()
|
||||
|
||||
timeframe = self._write_orchestrator.get_timeframe(
|
||||
self.usage_start)
|
||||
start = self.usage_start
|
||||
self.usage_start = None
|
||||
for data in timeframe:
|
||||
self.append(data['usage'],
|
||||
start,
|
||||
None)
|
||||
self.usage_start = self.usage_end
|
||||
|
||||
def _update_extra_headers(self, new_head):
|
||||
self._extra_headers.append(new_head)
|
||||
self._extra_headers.sort()
|
||||
self._extra_headers_len += 1
|
||||
|
||||
def _allocate_extra(self, line):
|
||||
for dummy in range(self._extra_headers_len):
|
||||
line.append('')
|
||||
|
||||
def _map_wildcard(self, base, report_data):
|
||||
wildcard_line = []
|
||||
headers_changed = False
|
||||
self._allocate_extra(wildcard_line)
|
||||
base_section, dummy = base.split(':')
|
||||
if not report_data:
|
||||
return []
|
||||
for field in report_data:
|
||||
col_name = base_section + ':' + field
|
||||
if col_name not in self._extra_headers:
|
||||
self._update_extra_headers(col_name)
|
||||
headers_changed = True
|
||||
else:
|
||||
idx = self._extra_headers.index(col_name)
|
||||
wildcard_line[idx] = report_data[field]
|
||||
if headers_changed:
|
||||
raise InconsistentHeaders('Headers value changed'
|
||||
', need to rebuild.')
|
||||
return wildcard_line
|
||||
|
||||
def _recurse_sections(self, sections, data):
|
||||
if not sections.count(':'):
|
||||
return data.get(sections, '')
|
||||
fields = sections.split(':')
|
||||
cur_data = data
|
||||
for field in fields:
|
||||
if field in cur_data:
|
||||
cur_data = cur_data[field]
|
||||
else:
|
||||
return None
|
||||
return cur_data
|
||||
|
||||
def _process_data(self, context, report_data):
|
||||
"""Transform the raw json data to the final CSV values.
|
||||
|
||||
"""
|
||||
if not self._headers_len:
|
||||
self._init_headers()
|
||||
|
||||
formated_data = []
|
||||
for base, mapped in self._field_map.items():
|
||||
final_data = ''
|
||||
if isinstance(mapped, str):
|
||||
mapped_section, mapped_field = mapped.rsplit(':', 1)
|
||||
data = self._recurse_sections(mapped_section, report_data)
|
||||
if mapped_field == '*':
|
||||
extra_fields = self._map_wildcard(base, data)
|
||||
formated_data.extend(extra_fields)
|
||||
continue
|
||||
elif mapped_section in report_data:
|
||||
data = report_data[mapped_section]
|
||||
if mapped_field in data:
|
||||
final_data = data[mapped_field]
|
||||
elif mapped is not None:
|
||||
final_data = mapped(context, report_data)
|
||||
formated_data.append(final_data)
|
||||
|
||||
self._usage_data.append(formated_data)
|
||||
@@ -1,150 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import collections
|
||||
import datetime
|
||||
|
||||
from cloudkitty.writer import csv_base
|
||||
|
||||
|
||||
class CSVMapped(csv_base.BaseCSVBackend):
|
||||
report_type = 'csv'
|
||||
|
||||
def __init__(self, write_orchestrator, user_id, backend, state_backend):
|
||||
super(CSVMapped, self).__init__(write_orchestrator,
|
||||
user_id,
|
||||
backend,
|
||||
state_backend)
|
||||
|
||||
# Detailed transform dict
|
||||
self._field_map = collections.OrderedDict(
|
||||
[('UsageStart', self._trans_get_usage_start),
|
||||
('UsageEnd', self._trans_get_usage_end),
|
||||
('ResourceId', self._trans_res_id),
|
||||
('Operation', self._trans_operation),
|
||||
('UserId', 'desc:user_id'),
|
||||
('ProjectId', 'desc:project_id'),
|
||||
('ItemName', 'desc:name'),
|
||||
('ItemFlavor', 'desc:flavor_name'),
|
||||
('ItemFlavorId', 'desc:flavor_id'),
|
||||
('AvailabilityZone', 'desc:availability_zone'),
|
||||
('Service', self._trans_service),
|
||||
('UsageQuantity', 'vol:qty'),
|
||||
('RateValue', 'rating:price'),
|
||||
('Cost', self._trans_calc_cost),
|
||||
('user:*', 'desc:metadata:*')])
|
||||
|
||||
def _write_total(self):
|
||||
lines = [[''] * self._headers_len for i in range(3)]
|
||||
for i in range(len(lines)):
|
||||
lines[i][1] = self._tenant_id
|
||||
|
||||
lines[1][2] = self._tenant_id
|
||||
|
||||
lines[0][3] = 'InvoiceTotal'
|
||||
lines[1][3] = 'AccountTotal'
|
||||
lines[2][3] = 'StatementTotal'
|
||||
|
||||
lines[0][5] = 'Total amount for invoice'
|
||||
lines[1][5] = 'Total for linked account# {}'.format(self._tenant_id)
|
||||
start_month = datetime.datetime(
|
||||
self.usage_start_dt.year,
|
||||
self.usage_start_dt.month,
|
||||
1)
|
||||
lines[2][5] = ('Total statement amount for period '
|
||||
'{} - {}').format(self._format_date(start_month),
|
||||
self._get_usage_end())
|
||||
|
||||
lines[0][8] = self.total
|
||||
lines[1][8] = self.total
|
||||
lines[2][8] = self.total
|
||||
|
||||
self._csv_report.writerows(lines)
|
||||
|
||||
@staticmethod
|
||||
def _format_date(raw_dt):
|
||||
return raw_dt.strftime('%Y/%m/%d %H:%M:%S')
|
||||
|
||||
def _get_usage_start(self):
|
||||
"""Get the start usage of this period.
|
||||
|
||||
"""
|
||||
if self.cached_start == self.usage_start:
|
||||
return self.cached_start_str
|
||||
else:
|
||||
self.cached_start = self.usage_start
|
||||
self.cached_start_str = self._format_date(self.usage_start_dt)
|
||||
return self.cached_start_str
|
||||
|
||||
def _get_usage_end(self):
|
||||
"""Get the end usage of this period.
|
||||
|
||||
"""
|
||||
if self.cached_start == self.usage_start and self.cached_end_str \
|
||||
and self.cached_end > self.cached_start:
|
||||
return self.cached_end_str
|
||||
else:
|
||||
usage_end = self.usage_start_dt + datetime.timedelta(
|
||||
seconds=self._period)
|
||||
self.cached_end_str = self._format_date(usage_end)
|
||||
return self.cached_end_str
|
||||
|
||||
def _trans_get_usage_start(self, _context, _report_data):
|
||||
"""Dummy transformation function to comply with the standard.
|
||||
|
||||
"""
|
||||
return self._get_usage_start()
|
||||
|
||||
def _trans_get_usage_end(self, _context, _report_data):
|
||||
"""Dummy transformation function to comply with the standard.
|
||||
|
||||
"""
|
||||
return self._get_usage_end()
|
||||
|
||||
def _trans_product_name(self, context, _report_data):
|
||||
"""Context dependent product name translation.
|
||||
|
||||
"""
|
||||
if context == 'compute' or context == 'instance':
|
||||
return 'Nova Computing'
|
||||
else:
|
||||
return context
|
||||
|
||||
def _trans_operation(self, context, _report_data):
|
||||
"""Context dependent operation translation.
|
||||
|
||||
"""
|
||||
if context == 'compute' or context == 'instance':
|
||||
return 'RunInstances'
|
||||
|
||||
def _trans_res_id(self, context, report_data):
|
||||
"""Context dependent resource id transformation function.
|
||||
|
||||
"""
|
||||
return report_data['desc'].get('resource_id')
|
||||
|
||||
def _trans_calc_cost(self, context, report_data):
|
||||
"""Cost calculation function.
|
||||
|
||||
"""
|
||||
try:
|
||||
quantity = report_data['vol'].get('qty')
|
||||
rate = report_data['rating'].get('price')
|
||||
return str(float(quantity) * rate)
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
def _trans_service(self, context, report_data):
|
||||
return context
|
||||
@@ -1,91 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import os
|
||||
|
||||
from cloudkitty.utils import json
|
||||
from cloudkitty import writer
|
||||
|
||||
|
||||
class OSRFBackend(writer.BaseReportWriter):
|
||||
"""OpenStack Report Format Writer:
|
||||
|
||||
Generates report in native format (json)
|
||||
"""
|
||||
report_type = 'osrf'
|
||||
|
||||
def _gen_filename(self, timeframe):
|
||||
filename = '{}-osrf-{}-{:02d}.json'.format(self._tenant_id,
|
||||
timeframe.year,
|
||||
timeframe.month)
|
||||
if self._basepath:
|
||||
filename = os.path.join(self._basepath, filename)
|
||||
return filename
|
||||
|
||||
def _open(self):
|
||||
filename = self._gen_filename(self.usage_start_dt)
|
||||
self._report = self._backend(filename, 'rb+')
|
||||
self._report.seek(0, 2)
|
||||
if self._report.tell():
|
||||
self._recover_state()
|
||||
else:
|
||||
self._report.seek(0)
|
||||
|
||||
def _write_header(self):
|
||||
self._report.write('[')
|
||||
self._report.flush()
|
||||
|
||||
def _write_total(self):
|
||||
total = {'total': self.total}
|
||||
self._report.write(json.dumps(total))
|
||||
self._report.write(']')
|
||||
self._report.flush()
|
||||
|
||||
def _recover_state(self):
|
||||
# Search for last comma
|
||||
self._report.seek(0, 2)
|
||||
max_idx = self._report.tell()
|
||||
if max_idx > 2000:
|
||||
max_idx = 2000
|
||||
hay = ''
|
||||
for idx in range(10, max_idx, 10):
|
||||
self._report.seek(-idx, 2)
|
||||
hay = self._report.read()
|
||||
if hay.count(','):
|
||||
break
|
||||
last_comma = hay.rfind(',')
|
||||
if last_comma > -1:
|
||||
last_comma -= len(hay)
|
||||
else:
|
||||
raise RuntimeError('Unable to recover file state.')
|
||||
self._report.seek(last_comma, 2)
|
||||
self._report.write(', ')
|
||||
self._report.truncate()
|
||||
|
||||
def _close_file(self):
|
||||
if self._report is not None:
|
||||
self._recover_state()
|
||||
self._write_total()
|
||||
self._report.close()
|
||||
|
||||
def _write(self):
|
||||
data = {}
|
||||
data['period'] = {'begin': self.usage_start_dt.isoformat(),
|
||||
'end': self.usage_end_dt.isoformat()}
|
||||
data['usage'] = self._usage_data
|
||||
|
||||
self._report.write(json.dumps(data))
|
||||
self._report.write(', ')
|
||||
self._report.flush()
|
||||
@@ -82,7 +82,6 @@ function is_cloudkitty_enabled {
|
||||
function cleanup_cloudkitty {
|
||||
# Clean up dirs
|
||||
rm -rf $CLOUDKITTY_CONF_DIR/*
|
||||
rm -rf $CLOUDKITTY_OUTPUT_BASEPATH/*
|
||||
for i in $(find $CLOUDKITTY_ENABLED_DIR -iname '_[0-9]*.py' -printf '%f\n'); do
|
||||
rm -f "${CLOUDKITTY_HORIZON_ENABLED_DIR}/$i"
|
||||
done
|
||||
@@ -184,11 +183,6 @@ function configure_cloudkitty {
|
||||
# when starting a devstack installation, but is NOT a recommended setting
|
||||
iniset $CLOUDKITTY_CONF collect wait_periods 0
|
||||
|
||||
# output
|
||||
iniset $CLOUDKITTY_CONF output backend $CLOUDKITTY_OUTPUT_BACKEND
|
||||
iniset $CLOUDKITTY_CONF output basepath $CLOUDKITTY_OUTPUT_BASEPATH
|
||||
iniset $CLOUDKITTY_CONF output pipeline $CLOUDKITTY_OUTPUT_PIPELINE
|
||||
|
||||
# storage
|
||||
iniset $CLOUDKITTY_CONF storage backend $CLOUDKITTY_STORAGE_BACKEND
|
||||
iniset $CLOUDKITTY_CONF storage version $CLOUDKITTY_STORAGE_VERSION
|
||||
@@ -247,11 +241,6 @@ function create_opensearch_index {
|
||||
|
||||
# init_cloudkitty() - Initialize CloudKitty database
|
||||
function init_cloudkitty {
|
||||
# Delete existing cache
|
||||
sudo rm -rf $CLOUDKITTY_OUTPUT_BASEPATH
|
||||
sudo mkdir -p $CLOUDKITTY_OUTPUT_BASEPATH
|
||||
sudo chown $STACK_USER $CLOUDKITTY_OUTPUT_BASEPATH
|
||||
|
||||
# (Re)create cloudkitty database
|
||||
recreate_database cloudkitty utf8
|
||||
|
||||
|
||||
@@ -55,11 +55,6 @@ CLOUDKITTY_STORAGE_BACKEND=${CLOUDKITTY_STORAGE_BACKEND:-"influxdb"}
|
||||
CLOUDKITTY_STORAGE_VERSION=${CLOUDKITTY_STORAGE_VERSION:-"2"}
|
||||
CLOUDKITTY_INFLUX_VERSION=${CLOUDKITTY_INFLUX_VERSION:-1}
|
||||
|
||||
# Set CloudKitty output info
|
||||
CLOUDKITTY_OUTPUT_BACKEND=${CLOUDKITTY_OUTPUT_BACKEND:-"cloudkitty.backend.file.FileBackend"}
|
||||
CLOUDKITTY_OUTPUT_BASEPATH=${CLOUDKITTY_OUTPUT_BASEPATH:-$CLOUDKITTY_REPORTS_DIR}
|
||||
CLOUDKITTY_OUTPUT_PIPELINE=${CLOUDKITTY_OUTPUT_PIPELINE:-"osrf"}
|
||||
|
||||
# Set Cloudkitty client info
|
||||
GITREPO["python-cloudkittyclient"]=${CLOUDKITTYCLIENT_REPO:-${GIT_BASE}/openstack/python-cloudkittyclient.git}
|
||||
GITDIR["python-cloudkittyclient"]=$DEST/python-cloudkittyclient
|
||||
|
||||
@@ -17,7 +17,6 @@ following executables:
|
||||
* ``cloudkitty-processor``: Processing service (collecting and rating)
|
||||
* ``cloudkitty-dbsync``: Tool to create and upgrade the database schema
|
||||
* ``cloudkitty-storage-init``: Tool to initiate the storage backend
|
||||
* ``cloudkitty-writer``: Reporting tool
|
||||
|
||||
Install sample configuration files::
|
||||
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
The ``cloudkitty-writer`` CLI has been removed. The CLI has been
|
||||
unmaintained for a long time and has not been functional for multiple
|
||||
releases. Use the report API instead. Also, the ``[output]`` configuration
|
||||
section has been removed.
|
||||
@@ -28,7 +28,6 @@ console_scripts =
|
||||
cloudkitty-dbsync = cloudkitty.cli.dbsync:main
|
||||
cloudkitty-processor = cloudkitty.cli.processor:main
|
||||
cloudkitty-storage-init = cloudkitty.cli.storage:main
|
||||
cloudkitty-writer = cloudkitty.cli.writer:main
|
||||
cloudkitty-status = cloudkitty.cli.status:main
|
||||
|
||||
wsgi_scripts =
|
||||
@@ -73,7 +72,3 @@ cloudkitty.storage.v2.backends =
|
||||
|
||||
cloudkitty.storage.hybrid.backends =
|
||||
gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage
|
||||
|
||||
cloudkitty.output.writers =
|
||||
osrf = cloudkitty.writer.osrf:OSRFBackend
|
||||
csv = cloudkitty.writer.csv_map:CSVMapped
|
||||
|
||||
Reference in New Issue
Block a user