Separated writing and rating processing

Added a new process cloudkitty-writer to handle report generation.
Removed writers references in the orchestrator.
Removed OSRTFBackend as the storage backend is now driver based.
Modified write_orchestrator to reflect changes made on the storage code.

Change-Id: I201448892a02796d23f11a92d95d3e8a3992b961
This commit is contained in:
Stéphane Albert
2014-11-14 09:57:24 +01:00
parent 734c2fae8d
commit 2398f39058
4 changed files with 131 additions and 179 deletions

57
cloudkitty/cli/writer.py Normal file
View File

@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from oslo.config import cfg
from stevedore import driver
from cloudkitty import config # noqa
from cloudkitty.openstack.common import importutils as i_utils
from cloudkitty import service
from cloudkitty import write_orchestrator
CONF = cfg.CONF
STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
def load_storage_backend():
storage_args = {'period': CONF.collect.period}
CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
backend = driver.DriverManager(
STORAGES_NAMESPACE,
CONF.storage.backend,
invoke_on_load=True,
invoke_kwds=storage_args).driver
return backend
def load_output_backend():
CONF.import_opt('backend', 'cloudkitty.config', 'output')
backend = i_utils.import_class(CONF.output.backend)
return backend
def main():
service.prepare_service()
output_backend = load_output_backend()
storage_backend = load_storage_backend()
wo = write_orchestrator.WriteOrchestrator(output_backend,
'writer',
storage_backend)
wo.init_writing_pipeline()
wo.restart_month()
wo.process()

View File

@@ -24,17 +24,14 @@ from oslo.config import cfg
from oslo import messaging from oslo import messaging
from stevedore import driver from stevedore import driver
from stevedore import extension from stevedore import extension
from stevedore import named
from cloudkitty.common import rpc from cloudkitty.common import rpc
from cloudkitty import config # NOQA from cloudkitty import config # NOQA
from cloudkitty import extension_manager from cloudkitty import extension_manager
from cloudkitty.openstack.common import importutils as i_utils
from cloudkitty.openstack.common import lockutils from cloudkitty.openstack.common import lockutils
from cloudkitty.openstack.common import log as logging from cloudkitty.openstack.common import log as logging
from cloudkitty import state from cloudkitty import state
from cloudkitty import utils as ck_utils from cloudkitty import utils as ck_utils
from cloudkitty import write_orchestrator as w_orch
eventlet.monkey_patch() eventlet.monkey_patch()
@@ -46,7 +43,6 @@ COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers' TRANSFORMERS_NAMESPACE = 'cloudkitty.transformers'
PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors' PROCESSORS_NAMESPACE = 'cloudkitty.billing.processors'
STORAGES_NAMESPACE = 'cloudkitty.storage.backends' STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
WRITERS_NAMESPACE = 'cloudkitty.output.writers'
class BillingEndpoint(object): class BillingEndpoint(object):
@@ -126,12 +122,6 @@ class Orchestrator(object):
invoke_on_load=True, invoke_on_load=True,
invoke_kwds=collector_args).driver invoke_kwds=collector_args).driver
w_backend = i_utils.import_class(CONF.output.backend)
self.wo = w_orch.WriteOrchestrator(w_backend,
self.keystone.user_id,
self.sm,
basepath=CONF.output.basepath)
CONF.import_opt('backend', 'cloudkitty.storage', 'storage') CONF.import_opt('backend', 'cloudkitty.storage', 'storage')
storage_args = {'period': CONF.collect.period} storage_args = {'period': CONF.collect.period}
self.storage = driver.DriverManager( self.storage = driver.DriverManager(
@@ -144,13 +134,6 @@ class Orchestrator(object):
self.b_processors = {} self.b_processors = {}
self._load_billing_processors() self._load_billing_processors()
# Output settings
output_pipeline = named.NamedExtensionManager(
WRITERS_NAMESPACE,
CONF.output.pipeline)
for writer in output_pipeline:
self.wo.add_writer(writer.plugin)
# RPC # RPC
self.server = None self.server = None
self._billing_endpoint = BillingEndpoint(self) self._billing_endpoint = BillingEndpoint(self)
@@ -262,14 +245,10 @@ class Orchestrator(object):
processor.process(data) processor.process(data)
# Writing # Writing
# Copy data to keep old behaviour with write_orchestrator
wo_data = list(data)
self.wo.append(wo_data)
self.storage.append(data) self.storage.append(data)
# We're getting a full period so we directly commit # We're getting a full period so we directly commit
self.wo.commit()
self.storage.commit() self.storage.commit()
def terminate(self): def terminate(self):
self.wo.close() pass

View File

@@ -15,60 +15,15 @@
# #
# @author: Stéphane Albert # @author: Stéphane Albert
# #
import datetime from oslo.config import cfg
import json from stevedore import named
import os.path
import zipfile
import cloudkitty.utils as utils from cloudkitty import state
from cloudkitty import storage
from cloudkitty import utils as ck_utils
CONF = cfg.CONF
class OSRTFBackend(object): WRITERS_NAMESPACE = 'cloudkitty.output.writers'
"""Native backend for transient report storage.
Used to store data from the output of the billing pipeline.
"""
def __init__(self):
self._osrtf = None
def open(self, filename):
# FIXME(sheeprine): ZipFile is working well with filename
# but not backend
self._osrtf = zipfile.ZipFile(filename, 'a')
def _gen_filename(self, timeframe):
filename = '{}-{:02d}-{:02d}-{}-{}.json'.format(timeframe.year,
timeframe.month,
timeframe.day,
timeframe.hour,
timeframe.minute)
return filename
def _file_exists(self, filename):
for file_info in self._osrtf.infolist():
if file_info.filename == filename:
return True
return False
def add(self, timeframe, data):
"""Add the data to the OpenStack Report Transient Format."""
filename = self._gen_filename(timeframe)
# We can only check for the existence of a file not rewrite or delete
# it
if not self._file_exists(filename):
self._osrtf.writestr(filename, json.dumps(data))
def get(self, timeframe):
try:
filename = self._gen_filename(timeframe)
data = json.loads(self._osrtf.read(filename))
return data
except Exception:
pass
def close(self):
self._osrtf.close()
class WriteOrchestrator(object): class WriteOrchestrator(object):
@@ -80,28 +35,32 @@ class WriteOrchestrator(object):
def __init__(self, def __init__(self,
backend, backend,
user_id, user_id,
state_manager, storage,
basepath=None, basepath=None,
period=3600): period=3600):
self._backend = backend self._backend = backend
self._uid = user_id self._uid = user_id
self._period = period self._storage = storage
self._sm = state_manager
self._basepath = basepath self._basepath = basepath
self._osrtf = None self._period = period
self._sm = state.DBStateManager(self._uid,
'writer_status')
self._write_pipeline = [] self._write_pipeline = []
# State vars # State vars
self.usage_start = None self.usage_start = None
self.usage_start_dt = None
self.usage_end = None self.usage_end = None
self.usage_end_dt = None
# Current total # Current total
self.total = 0 self.total = 0
# Current usage period lines def init_writing_pipeline(self):
self._usage_data = {} CONF.import_opt('pipeline', 'cloudkitty.config', 'output')
output_pipeline = named.NamedExtensionManager(
WRITERS_NAMESPACE,
CONF.output.pipeline)
for writer in output_pipeline:
self.add_writer(writer.plugin)
def add_writer(self, writer_class): def add_writer(self, writer_class):
writer = writer_class(self, writer = writer_class(self,
@@ -110,117 +69,73 @@ class WriteOrchestrator(object):
self._basepath) self._basepath)
self._write_pipeline.append(writer) self._write_pipeline.append(writer)
def _gen_osrtf_filename(self, timeframe): def _update_state_manager_data(self):
if not isinstance(timeframe, datetime.datetime):
raise TypeError('timeframe should be of type datetime.')
date = '{}-{:02d}'.format(timeframe.year, timeframe.month)
filename = '{}-osrtf-{}.zip'.format(self._uid, date)
return filename
def _update_state_manager(self):
self._sm.set_state(self.usage_end) self._sm.set_state(self.usage_end)
metadata = {'total': self.total} metadata = {'total': self.total}
self._sm.set_metadata(metadata) self._sm.set_metadata(metadata)
def _get_state_manager_timeframe(self): def _load_state_manager_data(self):
timeframe = self._sm.get_state() timeframe = self._sm.get_state()
self.usage_start = datetime.datetime.fromtimestamp(timeframe) if timeframe:
end_frame = timeframe + self._period self.usage_start = timeframe
self.usage_end = datetime.datetime.fromtimestamp(end_frame) self.usage_end = self.usage_start + self._period
metadata = self._sm.get_metadata() metadata = self._sm.get_metadata()
self.total = metadata.get('total', 0) if metadata:
self.total = metadata.get('total', 0)
def _filter_period(self, json_data):
"""Detect the best usage period to extract.
Removes the usage from the json data and returns it.
"""
candidate_ts = None
candidate_idx = 0
for idx, usage in enumerate(json_data):
usage_ts = usage['period']['begin']
if candidate_ts is None or usage_ts < candidate_ts:
candidate_ts = usage_ts
candidate_idx = idx
if candidate_ts:
return candidate_ts, json_data.pop(candidate_idx)['usage']
def _format_data(self, timeframe, data):
beg = utils.dt2ts(timeframe)
end = beg + self._period
final_data = {'period': {'begin': beg, 'end': end}}
final_data['usage'] = data
return [final_data]
def _open_osrtf(self):
if self._osrtf is None:
self._osrtf = OSRTFBackend()
filename = self._gen_osrtf_filename(self.usage_start_dt)
if self._basepath:
self._osrtf_filename = os.path.join(self._basepath, filename)
self._osrtf.open(self._osrtf_filename)
def _pre_commit(self):
self._open_osrtf()
def _commit(self):
self._pre_commit()
self._osrtf.add(self.usage_start_dt, self._usage_data)
# Dispatch data to writing pipeline
for backend in self._write_pipeline:
backend.append(self._usage_data, self.usage_start, self.usage_end)
self._update_state_manager()
self._usage_data = {}
self._post_commit()
def _post_commit(self):
self._osrtf.close()
def _dispatch(self, data): def _dispatch(self, data):
for service in data: for service in data:
if service in self._usage_data:
self._usage_data[service].extend(data[service])
else:
self._usage_data[service] = data[service]
# Update totals # Update totals
for entry in data[service]: for entry in data[service]:
self.total += entry['billing']['price'] self.total += entry['billing']['price']
# Dispatch data to writing pipeline
for backend in self._write_pipeline:
backend.append(data, self.usage_start, self.usage_end)
def get_timeframe(self, timeframe): def get_timeframe(self, timeframe, timeframe_end=None):
self._open_osrtf() if not timeframe_end:
data = self._osrtf.get(timeframe) timeframe_end = timeframe + self._period
self._osrtf.close() try:
return self._format_data(timeframe, data) data = self._storage.get_time_frame(timeframe,
timeframe_end)
def append(self, raw_data): except storage.NoTimeFrame:
while raw_data: return None
usage_start, data = self._filter_period(raw_data) return data
if self.usage_end is not None and usage_start >= self.usage_end:
self._commit()
self.usage_start = None
if self.usage_start is None:
self.usage_start = usage_start
self.usage_end = usage_start + self._period
self.usage_start_dt = (
datetime.datetime.fromtimestamp(self.usage_start))
self.usage_end_dt = (
datetime.datetime.fromtimestamp(self.usage_end))
self._dispatch(data)
def commit(self):
self._commit()
def close(self): def close(self):
for writer in self._write_pipeline: for writer in self._write_pipeline:
writer.close() writer.close()
if self._osrtf is not None:
self._osrtf.close() def _push_data(self):
data = self.get_timeframe(self.usage_start, self.usage_end)
if data:
for timeframe in data:
self._dispatch(timeframe['usage'])
def _commit_data(self):
for backend in self._write_pipeline:
backend.commit()
def reset_state(self):
self._load_state_manager_data()
self.usage_end = self._storage.get_state()
self._update_state_manager_data()
def restart_month(self):
self._load_state_manager_data()
self.usage_end = ck_utils.get_this_month_timestamp()
self._update_state_manager_data()
def process(self):
self._load_state_manager_data()
storage_state = self._storage.get_state()
if not self.usage_start:
self.usage_start = storage_state
self.usage_end = self.usage_start + self._period
while storage_state > self.usage_start:
self._push_data()
self._commit_data()
self._update_state_manager_data()
self._load_state_manager_data()
storage_state = self._storage.get_state()
self.close()

View File

@@ -24,6 +24,7 @@ console_scripts =
cloudkitty-dbsync = cloudkitty.cli.dbsync:main cloudkitty-dbsync = cloudkitty.cli.dbsync:main
cloudkitty-processor = cloudkitty.cli.processor:main cloudkitty-processor = cloudkitty.cli.processor:main
cloudkitty-storage-init = cloudkitty.cli.storage:main cloudkitty-storage-init = cloudkitty.cli.storage:main
cloudkitty-writer = cloudkitty.cli.writer:main
cloudkitty.collector.backends = cloudkitty.collector.backends =
ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector ceilometer = cloudkitty.collector.ceilometer:CeilometerCollector